1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.io.UnsupportedEncodingException;
26 import java.lang.reflect.Constructor;
27 import java.text.ParseException;
28 import java.util.AbstractList;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.NavigableMap;
41 import java.util.NavigableSet;
42 import java.util.RandomAccess;
43 import java.util.Set;
44 import java.util.TreeMap;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.CompletionService;
47 import java.util.concurrent.ConcurrentHashMap;
48 import java.util.concurrent.ConcurrentMap;
49 import java.util.concurrent.ConcurrentSkipListMap;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.ExecutionException;
52 import java.util.concurrent.ExecutorCompletionService;
53 import java.util.concurrent.ExecutorService;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.Future;
56 import java.util.concurrent.FutureTask;
57 import java.util.concurrent.ThreadFactory;
58 import java.util.concurrent.ThreadPoolExecutor;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicLong;
64 import java.util.concurrent.locks.Lock;
65 import java.util.concurrent.locks.ReentrantReadWriteLock;
66
67 import org.apache.commons.lang.RandomStringUtils;
68 import org.apache.commons.logging.Log;
69 import org.apache.commons.logging.LogFactory;
70 import org.apache.hadoop.conf.Configuration;
71 import org.apache.hadoop.fs.FileStatus;
72 import org.apache.hadoop.fs.FileSystem;
73 import org.apache.hadoop.fs.Path;
74 import org.apache.hadoop.hbase.Cell;
75 import org.apache.hadoop.hbase.CellScanner;
76 import org.apache.hadoop.hbase.CellUtil;
77 import org.apache.hadoop.hbase.CompoundConfiguration;
78 import org.apache.hadoop.hbase.DoNotRetryIOException;
79 import org.apache.hadoop.hbase.DroppedSnapshotException;
80 import org.apache.hadoop.hbase.HBaseConfiguration;
81 import org.apache.hadoop.hbase.HColumnDescriptor;
82 import org.apache.hadoop.hbase.HConstants;
83 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
84 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
85 import org.apache.hadoop.hbase.HRegionInfo;
86 import org.apache.hadoop.hbase.HTableDescriptor;
87 import org.apache.hadoop.hbase.KeyValue;
88 import org.apache.hadoop.hbase.KeyValueUtil;
89 import org.apache.hadoop.hbase.NamespaceDescriptor;
90 import org.apache.hadoop.hbase.NotServingRegionException;
91 import org.apache.hadoop.hbase.RegionTooBusyException;
92 import org.apache.hadoop.hbase.TableName;
93 import org.apache.hadoop.hbase.Tag;
94 import org.apache.hadoop.hbase.TagType;
95 import org.apache.hadoop.hbase.UnknownScannerException;
96 import org.apache.hadoop.hbase.backup.HFileArchiver;
97 import org.apache.hadoop.hbase.classification.InterfaceAudience;
98 import org.apache.hadoop.hbase.client.Append;
99 import org.apache.hadoop.hbase.client.Delete;
100 import org.apache.hadoop.hbase.client.Durability;
101 import org.apache.hadoop.hbase.client.Get;
102 import org.apache.hadoop.hbase.client.Increment;
103 import org.apache.hadoop.hbase.client.IsolationLevel;
104 import org.apache.hadoop.hbase.client.Mutation;
105 import org.apache.hadoop.hbase.client.Put;
106 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
107 import org.apache.hadoop.hbase.client.Result;
108 import org.apache.hadoop.hbase.client.RowMutations;
109 import org.apache.hadoop.hbase.client.Scan;
110 import org.apache.hadoop.hbase.conf.ConfigurationManager;
111 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
112 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
113 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
114 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
115 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
116 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
117 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
118 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
119 import org.apache.hadoop.hbase.filter.FilterWrapper;
120 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
121 import org.apache.hadoop.hbase.io.HeapSize;
122 import org.apache.hadoop.hbase.io.TimeRange;
123 import org.apache.hadoop.hbase.io.hfile.BlockCache;
124 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
125 import org.apache.hadoop.hbase.io.hfile.HFile;
126 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
127 import org.apache.hadoop.hbase.ipc.RpcCallContext;
128 import org.apache.hadoop.hbase.ipc.RpcServer;
129 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
130 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
131 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
135 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
136 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
137 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
138 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
139 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
140 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
141 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
142 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
143 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
144 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
145 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
146 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
147 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
148 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
149 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
150 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
151 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
152 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
153 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
154 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
155 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
156 import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
157 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
158 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
159 import org.apache.hadoop.hbase.security.User;
160 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
161 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
162 import org.apache.hadoop.hbase.util.ByteStringer;
163 import org.apache.hadoop.hbase.util.Bytes;
164 import org.apache.hadoop.hbase.util.CancelableProgressable;
165 import org.apache.hadoop.hbase.util.ClassSize;
166 import org.apache.hadoop.hbase.util.CompressionTest;
167 import org.apache.hadoop.hbase.util.Counter;
168 import org.apache.hadoop.hbase.util.EncryptionTest;
169 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
170 import org.apache.hadoop.hbase.util.FSTableDescriptors;
171 import org.apache.hadoop.hbase.util.FSUtils;
172 import org.apache.hadoop.hbase.util.HashedBytes;
173 import org.apache.hadoop.hbase.util.Pair;
174 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
175 import org.apache.hadoop.hbase.util.Threads;
176 import org.apache.hadoop.hbase.wal.WAL;
177 import org.apache.hadoop.hbase.wal.WALFactory;
178 import org.apache.hadoop.hbase.wal.WALKey;
179 import org.apache.hadoop.hbase.wal.WALSplitter;
180 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
181 import org.apache.hadoop.io.MultipleIOException;
182 import org.apache.hadoop.util.StringUtils;
183 import org.apache.htrace.Trace;
184 import org.apache.htrace.TraceScope;
185
186 import com.google.common.annotations.VisibleForTesting;
187 import com.google.common.base.Optional;
188 import com.google.common.base.Preconditions;
189 import com.google.common.collect.Lists;
190 import com.google.common.collect.Maps;
191 import com.google.common.io.Closeables;
192 import com.google.protobuf.ByteString;
193 import com.google.protobuf.Descriptors;
194 import com.google.protobuf.Message;
195 import com.google.protobuf.RpcCallback;
196 import com.google.protobuf.RpcController;
197 import com.google.protobuf.Service;
198 import com.google.protobuf.TextFormat;
199
200 @InterfaceAudience.Private
201 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
202 public static final Log LOG = LogFactory.getLog(HRegion.class);
203
204 public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
205 "hbase.hregion.scan.loadColumnFamiliesOnDemand";
206
207
208
209
210
211
212
213 private final int maxWaitForSeqId;
214 private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
215 private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
216
217
218
219
220
221 private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
222
223 final AtomicBoolean closed = new AtomicBoolean(false);
224
225
226
227
228
229 final AtomicBoolean closing = new AtomicBoolean(false);
230
231
232
233
234
235 private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
236
237
238
239
240
241
242 private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
243
244
245
246
247
248
249
250
251
252
253
254
255 private final AtomicLong sequenceId = new AtomicLong(-1L);
256
257
258
259
260
261
262 protected volatile long lastReplayedOpenRegionSeqId = -1L;
263 protected volatile long lastReplayedCompactionSeqId = -1L;
264
265
266
267
268
269
270
271
272
273 private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
274 new ConcurrentHashMap<HashedBytes, RowLockContext>();
275
276 protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
277 Bytes.BYTES_RAWCOMPARATOR);
278
279
280 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
281
282 public final AtomicLong memstoreSize = new AtomicLong(0);
283
284
285 final Counter numMutationsWithoutWAL = new Counter();
286 final Counter dataInMemoryWithoutWAL = new Counter();
287
288
289 final Counter checkAndMutateChecksPassed = new Counter();
290 final Counter checkAndMutateChecksFailed = new Counter();
291
292
293 final Counter readRequestsCount = new Counter();
294 final Counter writeRequestsCount = new Counter();
295
296
297 private final Counter blockedRequestsCount = new Counter();
298
299
300 final AtomicLong compactionsFinished = new AtomicLong(0L);
301 final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
302 final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
303
304 private final WAL wal;
305 private final HRegionFileSystem fs;
306 protected final Configuration conf;
307 private final Configuration baseConf;
308 private final KeyValue.KVComparator comparator;
309 private final int rowLockWaitDuration;
310 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
311
312
313
314
315
316
317
318 final long busyWaitDuration;
319 static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
320
321
322
323
324 final int maxBusyWaitMultiplier;
325
326
327
328 final long maxBusyWaitDuration;
329
330
331 static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
332 final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
333
334 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
335
336
337
338
339 private long openSeqNum = HConstants.NO_SEQNUM;
340
341
342
343
344
345 private boolean isLoadingCfsOnDemandDefault = false;
346
347 private final AtomicInteger majorInProgress = new AtomicInteger(0);
348 private final AtomicInteger minorInProgress = new AtomicInteger(0);
349
350
351
352
353
354
355
356 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
357
358
359 private PrepareFlushResult prepareFlushResult = null;
360
361
362
363
364 private boolean disallowWritesInRecovering = false;
365
366
367 private volatile boolean isRecovering = false;
368
369 private volatile Optional<ConfigurationManager> configurationManager;
370
371
372
373
374
375
376 public long getSmallestReadPoint() {
377 long minimumReadPoint;
378
379
380
381 synchronized(scannerReadPoints) {
382 minimumReadPoint = mvcc.memstoreReadPoint();
383
384 for (Long readPoint: this.scannerReadPoints.values()) {
385 if (readPoint < minimumReadPoint) {
386 minimumReadPoint = readPoint;
387 }
388 }
389 }
390 return minimumReadPoint;
391 }
392
393
394
395
396
397 static class WriteState {
398
399 volatile boolean flushing = false;
400
401 volatile boolean flushRequested = false;
402
403 volatile int compacting = 0;
404
405 volatile boolean writesEnabled = true;
406
407 volatile boolean readOnly = false;
408
409
410 volatile boolean readsEnabled = true;
411
412
413
414
415
416
417 synchronized void setReadOnly(final boolean onOff) {
418 this.writesEnabled = !onOff;
419 this.readOnly = onOff;
420 }
421
422 boolean isReadOnly() {
423 return this.readOnly;
424 }
425
426 boolean isFlushRequested() {
427 return this.flushRequested;
428 }
429
430 void setReadsEnabled(boolean readsEnabled) {
431 this.readsEnabled = readsEnabled;
432 }
433
434 static final long HEAP_SIZE = ClassSize.align(
435 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
436 }
437
438
439
440
441
442
443
444 public static class FlushResultImpl implements FlushResult {
445 final Result result;
446 final String failureReason;
447 final long flushSequenceId;
448 final boolean wroteFlushWalMarker;
449
450
451
452
453
454
455
456
457 FlushResultImpl(Result result, long flushSequenceId) {
458 this(result, flushSequenceId, null, false);
459 assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
460 .FLUSHED_COMPACTION_NEEDED;
461 }
462
463
464
465
466
467
468 FlushResultImpl(Result result, String failureReason, boolean wroteFlushMarker) {
469 this(result, -1, failureReason, wroteFlushMarker);
470 assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
471 }
472
473
474
475
476
477
478
479 FlushResultImpl(Result result, long flushSequenceId, String failureReason,
480 boolean wroteFlushMarker) {
481 this.result = result;
482 this.flushSequenceId = flushSequenceId;
483 this.failureReason = failureReason;
484 this.wroteFlushWalMarker = wroteFlushMarker;
485 }
486
487
488
489
490
491
492 @Override
493 public boolean isFlushSucceeded() {
494 return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
495 .FLUSHED_COMPACTION_NEEDED;
496 }
497
498
499
500
501
502 @Override
503 public boolean isCompactionNeeded() {
504 return result == Result.FLUSHED_COMPACTION_NEEDED;
505 }
506
507 @Override
508 public String toString() {
509 return new StringBuilder()
510 .append("flush result:").append(result).append(", ")
511 .append("failureReason:").append(failureReason).append(",")
512 .append("flush seq id").append(flushSequenceId).toString();
513 }
514
515 @Override
516 public Result getResult() {
517 return result;
518 }
519 }
520
521
522 @VisibleForTesting
523 static class PrepareFlushResult {
524 final FlushResult result;
525 final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
526 final TreeMap<byte[], List<Path>> committedFiles;
527 final TreeMap<byte[], Long> storeFlushableSize;
528 final long startTime;
529 final long flushOpSeqId;
530 final long flushedSeqId;
531 final long totalFlushableSize;
532
533
534 PrepareFlushResult(FlushResult result, long flushSeqId) {
535 this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
536 }
537
538
539 PrepareFlushResult(
540 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
541 TreeMap<byte[], List<Path>> committedFiles,
542 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
543 long flushedSeqId, long totalFlushableSize) {
544 this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
545 flushSeqId, flushedSeqId, totalFlushableSize);
546 }
547
548 private PrepareFlushResult(
549 FlushResult result,
550 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
551 TreeMap<byte[], List<Path>> committedFiles,
552 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
553 long flushedSeqId, long totalFlushableSize) {
554 this.result = result;
555 this.storeFlushCtxs = storeFlushCtxs;
556 this.committedFiles = committedFiles;
557 this.storeFlushableSize = storeFlushableSize;
558 this.startTime = startTime;
559 this.flushOpSeqId = flushSeqId;
560 this.flushedSeqId = flushedSeqId;
561 this.totalFlushableSize = totalFlushableSize;
562 }
563
564 public FlushResult getResult() {
565 return this.result;
566 }
567 }
568
569 final WriteState writestate = new WriteState();
570
571 long memstoreFlushSize;
572 final long timestampSlop;
573 final long rowProcessorTimeout;
574
575
576 private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
577 new ConcurrentHashMap<Store, Long>();
578
579 final RegionServerServices rsServices;
580 private RegionServerAccounting rsAccounting;
581 private long flushCheckInterval;
582
583 private long flushPerChanges;
584 private long blockingMemStoreSize;
585 final long threadWakeFrequency;
586
587 final ReentrantReadWriteLock lock =
588 new ReentrantReadWriteLock();
589
590
591 private final ReentrantReadWriteLock updatesLock =
592 new ReentrantReadWriteLock();
593 private boolean splitRequest;
594 private byte[] explicitSplitPoint = null;
595
596 private final MultiVersionConsistencyControl mvcc =
597 new MultiVersionConsistencyControl();
598
599
600 private RegionCoprocessorHost coprocessorHost;
601
602 private HTableDescriptor htableDescriptor = null;
603 private RegionSplitPolicy splitPolicy;
604 private FlushPolicy flushPolicy;
605
606 private final MetricsRegion metricsRegion;
607 private final MetricsRegionWrapperImpl metricsRegionWrapper;
608 private final Durability durability;
609 private final boolean regionStatsEnabled;
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631 @Deprecated
632 public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
633 final Configuration confParam, final HRegionInfo regionInfo,
634 final HTableDescriptor htd, final RegionServerServices rsServices) {
635 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
636 wal, confParam, htd, rsServices);
637 }
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655 public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
656 final HTableDescriptor htd, final RegionServerServices rsServices) {
657 if (htd == null) {
658 throw new IllegalArgumentException("Need table descriptor");
659 }
660
661 if (confParam instanceof CompoundConfiguration) {
662 throw new IllegalArgumentException("Need original base configuration");
663 }
664
665 this.comparator = fs.getRegionInfo().getComparator();
666 this.wal = wal;
667 this.fs = fs;
668
669
670 this.baseConf = confParam;
671 this.conf = new CompoundConfiguration()
672 .add(confParam)
673 .addStringMap(htd.getConfiguration())
674 .addWritableMap(htd.getValues());
675 this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
676 DEFAULT_CACHE_FLUSH_INTERVAL);
677 this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
678 if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
679 throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
680 + MAX_FLUSH_PER_CHANGES);
681 }
682 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
683 DEFAULT_ROWLOCK_WAIT_DURATION);
684
685 this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
686 this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
687 this.htableDescriptor = htd;
688 this.rsServices = rsServices;
689 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
690 setHTableSpecificConf();
691 this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
692
693 this.busyWaitDuration = conf.getLong(
694 "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
695 this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
696 if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
697 throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
698 + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
699 + maxBusyWaitMultiplier + "). Their product should be positive");
700 }
701 this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
702 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
703
704
705
706
707
708
709
710 this.timestampSlop = conf.getLong(
711 "hbase.hregion.keyvalue.timestamp.slop.millisecs",
712 HConstants.LATEST_TIMESTAMP);
713
714
715
716
717
718 this.rowProcessorTimeout = conf.getLong(
719 "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
720 this.durability = htd.getDurability() == Durability.USE_DEFAULT
721 ? DEFAULT_DURABILITY
722 : htd.getDurability();
723 if (rsServices != null) {
724 this.rsAccounting = this.rsServices.getRegionServerAccounting();
725
726
727 this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
728 this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
729 this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
730
731 Map<String, Region> recoveringRegions = rsServices.getRecoveringRegions();
732 String encodedName = getRegionInfo().getEncodedName();
733 if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
734 this.isRecovering = true;
735 recoveringRegions.put(encodedName, this);
736 }
737 } else {
738 this.metricsRegionWrapper = null;
739 this.metricsRegion = null;
740 }
741 if (LOG.isDebugEnabled()) {
742
743 LOG.debug("Instantiated " + this);
744 }
745
746
747 this.disallowWritesInRecovering =
748 conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
749 HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
750 configurationManager = Optional.absent();
751
752
753 this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
754 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
755 false :
756 conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
757 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
758 }
759
760 void setHTableSpecificConf() {
761 if (this.htableDescriptor == null) return;
762 long flushSize = this.htableDescriptor.getMemStoreFlushSize();
763
764 if (flushSize <= 0) {
765 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
766 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
767 }
768 this.memstoreFlushSize = flushSize;
769 this.blockingMemStoreSize = this.memstoreFlushSize *
770 conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
771 HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
772 }
773
774
775
776
777
778
779
780
781
782 @Deprecated
783 public long initialize() throws IOException {
784 return initialize(null);
785 }
786
787
788
789
790
791
792
793
794 private long initialize(final CancelableProgressable reporter) throws IOException {
795 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
796 long nextSeqId = -1;
797 try {
798 nextSeqId = initializeRegionInternals(reporter, status);
799 return nextSeqId;
800 } finally {
801
802
803 if (nextSeqId == -1) {
804 status.abort("Exception during region " + getRegionInfo().getRegionNameAsString() +
805 " initialization.");
806 }
807 }
808 }
809
810 private long initializeRegionInternals(final CancelableProgressable reporter,
811 final MonitoredTask status) throws IOException {
812 if (coprocessorHost != null) {
813 status.setStatus("Running coprocessor pre-open hook");
814 coprocessorHost.preOpen();
815 }
816
817
818 status.setStatus("Writing region info on filesystem");
819 fs.checkRegionInfoOnFilesystem();
820
821
822 status.setStatus("Initializing all the Stores");
823 long maxSeqId = initializeRegionStores(reporter, status, false);
824 this.lastReplayedOpenRegionSeqId = maxSeqId;
825
826 this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
827 this.writestate.flushRequested = false;
828 this.writestate.compacting = 0;
829
830 if (this.writestate.writesEnabled) {
831
832 status.setStatus("Cleaning up temporary data from old regions");
833 fs.cleanupTempDir();
834 }
835
836 if (this.writestate.writesEnabled) {
837 status.setStatus("Cleaning up detritus from prior splits");
838
839
840
841 fs.cleanupAnySplitDetritus();
842 fs.cleanupMergesDir();
843 }
844
845
846 this.splitPolicy = RegionSplitPolicy.create(this, conf);
847
848
849 this.flushPolicy = FlushPolicyFactory.create(this, conf);
850
851 long lastFlushTime = EnvironmentEdgeManager.currentTime();
852 for (Store store: stores.values()) {
853 this.lastStoreFlushTimeMap.put(store, lastFlushTime);
854 }
855
856
857
858 long nextSeqid = maxSeqId;
859
860
861
862
863 if (this.writestate.writesEnabled) {
864 nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
865 .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1));
866 } else {
867 nextSeqid++;
868 }
869
870 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
871 "; next sequenceid=" + nextSeqid);
872
873
874 this.closing.set(false);
875 this.closed.set(false);
876
877 if (coprocessorHost != null) {
878 status.setStatus("Running coprocessor post-open hooks");
879 coprocessorHost.postOpen();
880 }
881
882 status.markComplete("Region opened successfully");
883 return nextSeqid;
884 }
885
886 private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status,
887 boolean warmupOnly)
888 throws IOException {
889
890
891
892 long maxSeqId = -1;
893
894 long maxMemstoreTS = -1;
895
896 if (!htableDescriptor.getFamilies().isEmpty()) {
897
898 ThreadPoolExecutor storeOpenerThreadPool =
899 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
900 CompletionService<HStore> completionService =
901 new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
902
903
904 for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
905 status.setStatus("Instantiating store for column family " + family);
906 completionService.submit(new Callable<HStore>() {
907 @Override
908 public HStore call() throws IOException {
909 return instantiateHStore(family);
910 }
911 });
912 }
913 boolean allStoresOpened = false;
914 try {
915 for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
916 Future<HStore> future = completionService.take();
917 HStore store = future.get();
918 this.stores.put(store.getFamily().getName(), store);
919
920 long storeMaxSequenceId = store.getMaxSequenceId();
921 maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
922 storeMaxSequenceId);
923 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
924 maxSeqId = storeMaxSequenceId;
925 }
926 long maxStoreMemstoreTS = store.getMaxMemstoreTS();
927 if (maxStoreMemstoreTS > maxMemstoreTS) {
928 maxMemstoreTS = maxStoreMemstoreTS;
929 }
930 }
931 allStoresOpened = true;
932 } catch (InterruptedException e) {
933 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
934 } catch (ExecutionException e) {
935 throw new IOException(e.getCause());
936 } finally {
937 storeOpenerThreadPool.shutdownNow();
938 if (!allStoresOpened) {
939
940 LOG.error("Could not initialize all stores for the region=" + this);
941 for (Store store : this.stores.values()) {
942 try {
943 store.close();
944 } catch (IOException e) {
945 LOG.warn(e.getMessage());
946 }
947 }
948 }
949 }
950 }
951 if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this) && !warmupOnly) {
952
953 maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
954 this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
955 }
956 maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
957 mvcc.initialize(maxSeqId);
958 return maxSeqId;
959 }
960
961 private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
962 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
963
964
965 status.setStatus("Warming up all the Stores");
966 initializeRegionStores(reporter, status, true);
967 }
968
969
970
971
972 private NavigableMap<byte[], List<Path>> getStoreFiles() {
973 NavigableMap<byte[], List<Path>> allStoreFiles =
974 new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
975 for (Store store: getStores()) {
976 Collection<StoreFile> storeFiles = store.getStorefiles();
977 if (storeFiles == null) continue;
978 List<Path> storeFileNames = new ArrayList<Path>();
979 for (StoreFile storeFile: storeFiles) {
980 storeFileNames.add(storeFile.getPath());
981 }
982 allStoreFiles.put(store.getFamily().getName(), storeFileNames);
983 }
984 return allStoreFiles;
985 }
986
987 private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
988 Map<byte[], List<Path>> storeFiles = getStoreFiles();
989 RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
990 RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
991 getRegionServerServices().getServerName(), storeFiles);
992 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
993 getSequenceId());
994 }
995
996 private void writeRegionCloseMarker(WAL wal) throws IOException {
997 Map<byte[], List<Path>> storeFiles = getStoreFiles();
998 RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
999 RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
1000 getRegionServerServices().getServerName(), storeFiles);
1001 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
1002 getSequenceId());
1003
1004
1005
1006
1007 if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
1008 WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
1009 getSequenceId().get(), 0);
1010 }
1011 }
1012
1013
1014
1015
1016 public boolean hasReferences() {
1017 for (Store store : this.stores.values()) {
1018 if (store.hasReferences()) return true;
1019 }
1020 return false;
1021 }
1022
1023 @Override
1024 public HDFSBlocksDistribution getHDFSBlocksDistribution() {
1025 HDFSBlocksDistribution hdfsBlocksDistribution =
1026 new HDFSBlocksDistribution();
1027 synchronized (this.stores) {
1028 for (Store store : this.stores.values()) {
1029 Collection<StoreFile> storeFiles = store.getStorefiles();
1030 if (storeFiles == null) continue;
1031 for (StoreFile sf : storeFiles) {
1032 HDFSBlocksDistribution storeFileBlocksDistribution =
1033 sf.getHDFSBlockDistribution();
1034 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
1035 }
1036 }
1037 }
1038 return hdfsBlocksDistribution;
1039 }
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1050 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
1051 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
1052 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
1053 }
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1065 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
1066 throws IOException {
1067 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1068 FileSystem fs = tablePath.getFileSystem(conf);
1069
1070 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1071 for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1072 Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1073 if (storeFiles == null) continue;
1074 for (StoreFileInfo storeFileInfo : storeFiles) {
1075 hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1076 }
1077 }
1078 return hdfsBlocksDistribution;
1079 }
1080
1081
1082
1083
1084
1085
1086 public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1087 if (this.rsAccounting != null) {
1088 rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1089 }
1090 long size = this.memstoreSize.addAndGet(memStoreSize);
1091
1092
1093 if (size < 0) {
1094 LOG.error("Asked to modify this region's (" + this.toString()
1095 + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
1096 + (size-memStoreSize) + ", delta=" + memStoreSize, new Exception());
1097 }
1098 return size;
1099 }
1100
1101 @Override
1102 public HRegionInfo getRegionInfo() {
1103 return this.fs.getRegionInfo();
1104 }
1105
1106
1107
1108
1109
1110 RegionServerServices getRegionServerServices() {
1111 return this.rsServices;
1112 }
1113
1114 @Override
1115 public long getReadRequestsCount() {
1116 return readRequestsCount.get();
1117 }
1118
1119 @Override
1120 public void updateReadRequestsCount(long i) {
1121 readRequestsCount.add(i);
1122 }
1123
1124 @Override
1125 public long getWriteRequestsCount() {
1126 return writeRequestsCount.get();
1127 }
1128
1129 @Override
1130 public void updateWriteRequestsCount(long i) {
1131 writeRequestsCount.add(i);
1132 }
1133
1134 @Override
1135 public long getMemstoreSize() {
1136 return memstoreSize.get();
1137 }
1138
1139 @Override
1140 public long getNumMutationsWithoutWAL() {
1141 return numMutationsWithoutWAL.get();
1142 }
1143
1144 @Override
1145 public long getDataInMemoryWithoutWAL() {
1146 return dataInMemoryWithoutWAL.get();
1147 }
1148
1149 @Override
1150 public long getBlockedRequestsCount() {
1151 return blockedRequestsCount.get();
1152 }
1153
1154 @Override
1155 public long getCheckAndMutateChecksPassed() {
1156 return checkAndMutateChecksPassed.get();
1157 }
1158
1159 @Override
1160 public long getCheckAndMutateChecksFailed() {
1161 return checkAndMutateChecksFailed.get();
1162 }
1163
1164 @Override
1165 public MetricsRegion getMetrics() {
1166 return metricsRegion;
1167 }
1168
1169 @Override
1170 public boolean isClosed() {
1171 return this.closed.get();
1172 }
1173
1174 @Override
1175 public boolean isClosing() {
1176 return this.closing.get();
1177 }
1178
1179 @Override
1180 public boolean isReadOnly() {
1181 return this.writestate.isReadOnly();
1182 }
1183
1184
1185
1186
1187 public void setRecovering(boolean newState) {
1188 boolean wasRecovering = this.isRecovering;
1189
1190
1191 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
1192 && wasRecovering && !newState) {
1193
1194
1195 boolean forceFlush = getTableDesc().getRegionReplication() > 1;
1196
1197
1198 MonitoredTask status = TaskMonitor.get().createStatus(
1199 "Flushing region " + this + " because recovery is finished");
1200 try {
1201 if (forceFlush) {
1202 internalFlushcache(status);
1203 }
1204
1205 status.setStatus("Writing region open event marker to WAL because recovery is finished");
1206 try {
1207 long seqId = openSeqNum;
1208
1209 if (wal != null) {
1210 seqId = getNextSequenceId(wal);
1211 }
1212 writeRegionOpenMarker(wal, seqId);
1213 } catch (IOException e) {
1214
1215
1216 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening "
1217 + "event to WAL, continueing", e);
1218 }
1219 } catch (IOException ioe) {
1220
1221
1222 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush "
1223 + "event to WAL, continueing", ioe);
1224 } finally {
1225 status.cleanup();
1226 }
1227 }
1228
1229 this.isRecovering = newState;
1230 if (wasRecovering && !isRecovering) {
1231
1232 coprocessorHost.postLogReplay();
1233 }
1234 }
1235
1236 @Override
1237 public boolean isRecovering() {
1238 return this.isRecovering;
1239 }
1240
1241 @Override
1242 public boolean isAvailable() {
1243 return !isClosed() && !isClosing();
1244 }
1245
1246
1247 public boolean isSplittable() {
1248 return isAvailable() && !hasReferences();
1249 }
1250
1251
1252
1253
1254 public boolean isMergeable() {
1255 if (!isAvailable()) {
1256 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1257 + " is not mergeable because it is closing or closed");
1258 return false;
1259 }
1260 if (hasReferences()) {
1261 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1262 + " is not mergeable because it has references");
1263 return false;
1264 }
1265
1266 return true;
1267 }
1268
1269 public boolean areWritesEnabled() {
1270 synchronized(this.writestate) {
1271 return this.writestate.writesEnabled;
1272 }
1273 }
1274
1275 public MultiVersionConsistencyControl getMVCC() {
1276 return mvcc;
1277 }
1278
1279 @Override
1280 public long getMaxFlushedSeqId() {
1281 return maxFlushedSeqId;
1282 }
1283
1284 @Override
1285 public long getReadpoint(IsolationLevel isolationLevel) {
1286 if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1287
1288 return Long.MAX_VALUE;
1289 }
1290 return mvcc.memstoreReadPoint();
1291 }
1292
1293 @Override
1294 public boolean isLoadingCfsOnDemandDefault() {
1295 return this.isLoadingCfsOnDemandDefault;
1296 }
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314 public Map<byte[], List<StoreFile>> close() throws IOException {
1315 return close(false);
1316 }
1317
1318 private final Object closeLock = new Object();
1319
1320
1321 public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1322 "hbase.regionserver.optionalcacheflushinterval";
1323
1324 public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1325 public static final int META_CACHE_FLUSH_INTERVAL = 300000;
1326
1327
1328 public static final String MEMSTORE_FLUSH_PER_CHANGES =
1329 "hbase.regionserver.flush.per.changes";
1330 public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000;
1331
1332
1333
1334
1335 public static final long MAX_FLUSH_PER_CHANGES = 1000000000;
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354 public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1355
1356
1357 MonitoredTask status = TaskMonitor.get().createStatus(
1358 "Closing region " + this +
1359 (abort ? " due to abort" : ""));
1360
1361 status.setStatus("Waiting for close lock");
1362 try {
1363 synchronized (closeLock) {
1364 return doClose(abort, status);
1365 }
1366 } finally {
1367 status.cleanup();
1368 }
1369 }
1370
1371
1372
1373
1374 @VisibleForTesting
1375 public void setClosing(boolean closing) {
1376 this.closing.set(closing);
1377 }
1378
1379 private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1380 throws IOException {
1381 if (isClosed()) {
1382 LOG.warn("Region " + this + " already closed");
1383 return null;
1384 }
1385
1386 if (coprocessorHost != null) {
1387 status.setStatus("Running coprocessor pre-close hooks");
1388 this.coprocessorHost.preClose(abort);
1389 }
1390
1391 status.setStatus("Disabling compacts and flushes for region");
1392 boolean canFlush = true;
1393 synchronized (writestate) {
1394
1395
1396 canFlush = !writestate.readOnly;
1397 writestate.writesEnabled = false;
1398 LOG.debug("Closing " + this + ": disabling compactions & flushes");
1399 waitForFlushesAndCompactions();
1400 }
1401
1402
1403
1404 if (!abort && worthPreFlushing() && canFlush) {
1405 status.setStatus("Pre-flushing region before close");
1406 LOG.info("Running close preflush of " + getRegionInfo().getRegionNameAsString());
1407 try {
1408 internalFlushcache(status);
1409 } catch (IOException ioe) {
1410
1411 status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1412 }
1413 }
1414
1415
1416 lock.writeLock().lock();
1417 this.closing.set(true);
1418 status.setStatus("Disabling writes for close");
1419 try {
1420 if (this.isClosed()) {
1421 status.abort("Already got closed by another process");
1422
1423 return null;
1424 }
1425 LOG.debug("Updates disabled for region " + this);
1426
1427 if (!abort && canFlush) {
1428 int flushCount = 0;
1429 while (this.memstoreSize.get() > 0) {
1430 try {
1431 if (flushCount++ > 0) {
1432 int actualFlushes = flushCount - 1;
1433 if (actualFlushes > 5) {
1434
1435
1436 throw new DroppedSnapshotException("Failed clearing memory after " +
1437 actualFlushes + " attempts on region: " +
1438 Bytes.toStringBinary(getRegionInfo().getRegionName()));
1439 }
1440 LOG.info("Running extra flush, " + actualFlushes +
1441 " (carrying snapshot?) " + this);
1442 }
1443 internalFlushcache(status);
1444 } catch (IOException ioe) {
1445 status.setStatus("Failed flush " + this + ", putting online again");
1446 synchronized (writestate) {
1447 writestate.writesEnabled = true;
1448 }
1449
1450 throw ioe;
1451 }
1452 }
1453 }
1454
1455 Map<byte[], List<StoreFile>> result =
1456 new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1457 if (!stores.isEmpty()) {
1458
1459 ThreadPoolExecutor storeCloserThreadPool =
1460 getStoreOpenAndCloseThreadPool("StoreCloserThread-" +
1461 getRegionInfo().getRegionNameAsString());
1462 CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1463 new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1464
1465
1466 for (final Store store : stores.values()) {
1467 long flushableSize = store.getFlushableSize();
1468 if (!(abort || flushableSize == 0 || writestate.readOnly)) {
1469 getRegionServerServices().abort("Assertion failed while closing store "
1470 + getRegionInfo().getRegionNameAsString() + " " + store
1471 + ". flushableSize expected=0, actual= " + flushableSize
1472 + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
1473 + "operation failed and left the memstore in a partially updated state.", null);
1474 }
1475 completionService
1476 .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1477 @Override
1478 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1479 return new Pair<byte[], Collection<StoreFile>>(
1480 store.getFamily().getName(), store.close());
1481 }
1482 });
1483 }
1484 try {
1485 for (int i = 0; i < stores.size(); i++) {
1486 Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1487 Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1488 List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1489 if (familyFiles == null) {
1490 familyFiles = new ArrayList<StoreFile>();
1491 result.put(storeFiles.getFirst(), familyFiles);
1492 }
1493 familyFiles.addAll(storeFiles.getSecond());
1494 }
1495 } catch (InterruptedException e) {
1496 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1497 } catch (ExecutionException e) {
1498 throw new IOException(e.getCause());
1499 } finally {
1500 storeCloserThreadPool.shutdownNow();
1501 }
1502 }
1503
1504 status.setStatus("Writing region close event to WAL");
1505 if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1506 writeRegionCloseMarker(wal);
1507 }
1508
1509 this.closed.set(true);
1510 if (!canFlush) {
1511 addAndGetGlobalMemstoreSize(-memstoreSize.get());
1512 } else if (memstoreSize.get() != 0) {
1513 LOG.error("Memstore size is " + memstoreSize.get());
1514 }
1515 if (coprocessorHost != null) {
1516 status.setStatus("Running coprocessor post-close hooks");
1517 this.coprocessorHost.postClose(abort);
1518 }
1519 if (this.metricsRegion != null) {
1520 this.metricsRegion.close();
1521 }
1522 if (this.metricsRegionWrapper != null) {
1523 Closeables.closeQuietly(this.metricsRegionWrapper);
1524 }
1525 status.markComplete("Closed");
1526 LOG.info("Closed " + this);
1527 return result;
1528 } finally {
1529 lock.writeLock().unlock();
1530 }
1531 }
1532
1533 @Override
1534 public void waitForFlushesAndCompactions() {
1535 synchronized (writestate) {
1536 if (this.writestate.readOnly) {
1537
1538
1539 return;
1540 }
1541 boolean interrupted = false;
1542 try {
1543 while (writestate.compacting > 0 || writestate.flushing) {
1544 LOG.debug("waiting for " + writestate.compacting + " compactions"
1545 + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1546 try {
1547 writestate.wait();
1548 } catch (InterruptedException iex) {
1549
1550 LOG.warn("Interrupted while waiting");
1551 interrupted = true;
1552 }
1553 }
1554 } finally {
1555 if (interrupted) {
1556 Thread.currentThread().interrupt();
1557 }
1558 }
1559 }
1560 }
1561
1562 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1563 final String threadNamePrefix) {
1564 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1565 int maxThreads = Math.min(numStores,
1566 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1567 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1568 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1569 }
1570
1571 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1572 final String threadNamePrefix) {
1573 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1574 int maxThreads = Math.max(1,
1575 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1576 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1577 / numStores);
1578 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1579 }
1580
1581 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1582 final String threadNamePrefix) {
1583 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1584 new ThreadFactory() {
1585 private int count = 1;
1586
1587 @Override
1588 public Thread newThread(Runnable r) {
1589 return new Thread(r, threadNamePrefix + "-" + count++);
1590 }
1591 });
1592 }
1593
1594
1595
1596
1597 private boolean worthPreFlushing() {
1598 return this.memstoreSize.get() >
1599 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1600 }
1601
1602
1603
1604
1605
1606 @Override
1607 public HTableDescriptor getTableDesc() {
1608 return this.htableDescriptor;
1609 }
1610
1611
1612 public WAL getWAL() {
1613 return this.wal;
1614 }
1615
1616
1617
1618
1619
1620
1621
1622
1623 Configuration getBaseConf() {
1624 return this.baseConf;
1625 }
1626
1627
1628 public FileSystem getFilesystem() {
1629 return fs.getFileSystem();
1630 }
1631
1632
1633 public HRegionFileSystem getRegionFileSystem() {
1634 return this.fs;
1635 }
1636
1637 @Override
1638 public long getEarliestFlushTimeForAllStores() {
1639 return lastStoreFlushTimeMap.isEmpty() ? Long.MAX_VALUE : Collections.min(lastStoreFlushTimeMap
1640 .values());
1641 }
1642
1643 @Override
1644 public long getOldestHfileTs(boolean majorCompactioOnly) throws IOException {
1645 long result = Long.MAX_VALUE;
1646 for (Store store : getStores()) {
1647 Collection<StoreFile> storeFiles = store.getStorefiles();
1648 if (storeFiles == null) continue;
1649 for (StoreFile file : storeFiles) {
1650 StoreFile.Reader sfReader = file.getReader();
1651 if (sfReader == null) continue;
1652 HFile.Reader reader = sfReader.getHFileReader();
1653 if (reader == null) continue;
1654 if (majorCompactioOnly) {
1655 byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
1656 if (val == null) continue;
1657 if (val == null || !Bytes.toBoolean(val)) {
1658 continue;
1659 }
1660 }
1661 result = Math.min(result, reader.getFileContext().getFileCreateTime());
1662 }
1663 }
1664 return result == Long.MAX_VALUE ? 0 : result;
1665 }
1666
1667 RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) {
1668 long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId;
1669 byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
1670 regionLoadBldr.clearStoreCompleteSequenceId();
1671 for (byte[] familyName : this.stores.keySet()) {
1672 long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
1673
1674
1675 regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId
1676 .newBuilder()
1677 .setFamilyName(ByteString.copyFrom(familyName))
1678 .setSequenceId(
1679 oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
1680 }
1681 return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
1682 }
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692 public long getLargestHStoreSize() {
1693 long size = 0;
1694 for (Store h : stores.values()) {
1695 long storeSize = h.getSize();
1696 if (storeSize > size) {
1697 size = storeSize;
1698 }
1699 }
1700 return size;
1701 }
1702
1703
1704
1705
1706 public KeyValue.KVComparator getComparator() {
1707 return this.comparator;
1708 }
1709
1710
1711
1712
1713
1714 protected void doRegionCompactionPrep() throws IOException {
1715 }
1716
1717 @Override
1718 public void triggerMajorCompaction() throws IOException {
1719 for (Store s : getStores()) {
1720 s.triggerMajorCompaction();
1721 }
1722 }
1723
1724 @Override
1725 public void compact(final boolean majorCompaction) throws IOException {
1726 if (majorCompaction) {
1727 triggerMajorCompaction();
1728 }
1729 for (Store s : getStores()) {
1730 CompactionContext compaction = s.requestCompaction();
1731 if (compaction != null) {
1732 CompactionThroughputController controller = null;
1733 if (rsServices != null) {
1734 controller = CompactionThroughputControllerFactory.create(rsServices, conf);
1735 }
1736 if (controller == null) {
1737 controller = NoLimitCompactionThroughputController.INSTANCE;
1738 }
1739 compact(compaction, s, controller, null);
1740 }
1741 }
1742 }
1743
1744
1745
1746
1747
1748
1749
1750 public void compactStores() throws IOException {
1751 for (Store s : getStores()) {
1752 CompactionContext compaction = s.requestCompaction();
1753 if (compaction != null) {
1754 compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
1755 }
1756 }
1757 }
1758
1759
1760
1761
1762
1763
1764
1765 @VisibleForTesting
1766 void compactStore(byte[] family, CompactionThroughputController throughputController)
1767 throws IOException {
1768 Store s = getStore(family);
1769 CompactionContext compaction = s.requestCompaction();
1770 if (compaction != null) {
1771 compact(compaction, s, throughputController, null);
1772 }
1773 }
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790 public boolean compact(CompactionContext compaction, Store store,
1791 CompactionThroughputController throughputController) throws IOException {
1792 return compact(compaction, store, throughputController, null);
1793 }
1794
1795 public boolean compact(CompactionContext compaction, Store store,
1796 CompactionThroughputController throughputController, User user) throws IOException {
1797 assert compaction != null && compaction.hasSelection();
1798 assert !compaction.getRequest().getFiles().isEmpty();
1799 if (this.closing.get() || this.closed.get()) {
1800 LOG.debug("Skipping compaction on " + this + " because closing/closed");
1801 store.cancelRequestedCompaction(compaction);
1802 return false;
1803 }
1804 MonitoredTask status = null;
1805 boolean requestNeedsCancellation = true;
1806
1807 lock.readLock().lock();
1808 try {
1809 byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1810 if (stores.get(cf) != store) {
1811 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1812 + " has been re-instantiated, cancel this compaction request. "
1813 + " It may be caused by the roll back of split transaction");
1814 return false;
1815 }
1816
1817 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1818 if (this.closed.get()) {
1819 String msg = "Skipping compaction on " + this + " because closed";
1820 LOG.debug(msg);
1821 status.abort(msg);
1822 return false;
1823 }
1824 boolean wasStateSet = false;
1825 try {
1826 synchronized (writestate) {
1827 if (writestate.writesEnabled) {
1828 wasStateSet = true;
1829 ++writestate.compacting;
1830 } else {
1831 String msg = "NOT compacting region " + this + ". Writes disabled.";
1832 LOG.info(msg);
1833 status.abort(msg);
1834 return false;
1835 }
1836 }
1837 LOG.info("Starting compaction on " + store + " in region " + this
1838 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1839 doRegionCompactionPrep();
1840 try {
1841 status.setStatus("Compacting store " + store);
1842
1843
1844 requestNeedsCancellation = false;
1845 store.compact(compaction, throughputController, user);
1846 } catch (InterruptedIOException iioe) {
1847 String msg = "compaction interrupted";
1848 LOG.info(msg, iioe);
1849 status.abort(msg);
1850 return false;
1851 }
1852 } finally {
1853 if (wasStateSet) {
1854 synchronized (writestate) {
1855 --writestate.compacting;
1856 if (writestate.compacting <= 0) {
1857 writestate.notifyAll();
1858 }
1859 }
1860 }
1861 }
1862 status.markComplete("Compaction complete");
1863 return true;
1864 } finally {
1865 try {
1866 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1867 if (status != null) status.cleanup();
1868 } finally {
1869 lock.readLock().unlock();
1870 }
1871 }
1872 }
1873
1874 @Override
1875 public FlushResult flush(boolean force) throws IOException {
1876 return flushcache(force, false);
1877 }
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901 public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
1902 throws IOException {
1903
1904 if (this.closing.get()) {
1905 String msg = "Skipping flush on " + this + " because closing";
1906 LOG.debug(msg);
1907 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1908 }
1909 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1910 status.setStatus("Acquiring readlock on region");
1911
1912 lock.readLock().lock();
1913 try {
1914 if (this.closed.get()) {
1915 String msg = "Skipping flush on " + this + " because closed";
1916 LOG.debug(msg);
1917 status.abort(msg);
1918 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1919 }
1920 if (coprocessorHost != null) {
1921 status.setStatus("Running coprocessor pre-flush hooks");
1922 coprocessorHost.preFlush();
1923 }
1924
1925
1926 if (numMutationsWithoutWAL.get() > 0) {
1927 numMutationsWithoutWAL.set(0);
1928 dataInMemoryWithoutWAL.set(0);
1929 }
1930 synchronized (writestate) {
1931 if (!writestate.flushing && writestate.writesEnabled) {
1932 this.writestate.flushing = true;
1933 } else {
1934 if (LOG.isDebugEnabled()) {
1935 LOG.debug("NOT flushing memstore for region " + this
1936 + ", flushing=" + writestate.flushing + ", writesEnabled="
1937 + writestate.writesEnabled);
1938 }
1939 String msg = "Not flushing since "
1940 + (writestate.flushing ? "already flushing"
1941 : "writes not enabled");
1942 status.abort(msg);
1943 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1944 }
1945 }
1946
1947 try {
1948 Collection<Store> specificStoresToFlush =
1949 forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
1950 FlushResult fs = internalFlushcache(specificStoresToFlush,
1951 status, writeFlushRequestWalMarker);
1952
1953 if (coprocessorHost != null) {
1954 status.setStatus("Running post-flush coprocessor hooks");
1955 coprocessorHost.postFlush();
1956 }
1957
1958 status.markComplete("Flush successful");
1959 return fs;
1960 } finally {
1961 synchronized (writestate) {
1962 writestate.flushing = false;
1963 this.writestate.flushRequested = false;
1964 writestate.notifyAll();
1965 }
1966 }
1967 } finally {
1968 lock.readLock().unlock();
1969 status.cleanup();
1970 }
1971 }
1972
1973
1974
1975
1976
1977
1978
1979
1980 boolean shouldFlushStore(Store store) {
1981 long maxFlushedSeqId =
1982 this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
1983 .getFamily().getName()) - 1;
1984 if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
1985 if (LOG.isDebugEnabled()) {
1986 LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
1987 + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
1988 + ") is far away from current(" + sequenceId.get() + "), max allowed is "
1989 + flushPerChanges);
1990 }
1991 return true;
1992 }
1993 if (flushCheckInterval <= 0) {
1994 return false;
1995 }
1996 long now = EnvironmentEdgeManager.currentTime();
1997 if (store.timeOfOldestEdit() < now - flushCheckInterval) {
1998 if (LOG.isDebugEnabled()) {
1999 LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
2000 + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
2001 + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
2002 }
2003 return true;
2004 }
2005 return false;
2006 }
2007
2008
2009
2010
2011 boolean shouldFlush() {
2012
2013 if (this.maxFlushedSeqId > 0
2014 && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
2015 return true;
2016 }
2017 long modifiedFlushCheckInterval = flushCheckInterval;
2018 if (getRegionInfo().isMetaRegion() &&
2019 getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
2020 modifiedFlushCheckInterval = META_CACHE_FLUSH_INTERVAL;
2021 }
2022 if (modifiedFlushCheckInterval <= 0) {
2023 return false;
2024 }
2025 long now = EnvironmentEdgeManager.currentTime();
2026
2027 if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
2028 return false;
2029 }
2030
2031
2032 for (Store s : getStores()) {
2033 if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
2034
2035 return true;
2036 }
2037 }
2038 return false;
2039 }
2040
2041
2042
2043
2044
2045
2046 private FlushResult internalFlushcache(MonitoredTask status)
2047 throws IOException {
2048 return internalFlushcache(stores.values(), status, false);
2049 }
2050
2051
2052
2053
2054
2055
2056 private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
2057 MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
2058 return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
2059 status, writeFlushWalMarker);
2060 }
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090 protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
2091 final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2092 throws IOException {
2093 PrepareFlushResult result
2094 = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
2095 if (result.result == null) {
2096 return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
2097 } else {
2098 return result.result;
2099 }
2100 }
2101
2102 protected PrepareFlushResult internalPrepareFlushCache(
2103 final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
2104 MonitoredTask status, boolean writeFlushWalMarker)
2105 throws IOException {
2106
2107 if (this.rsServices != null && this.rsServices.isAborted()) {
2108
2109 throw new IOException("Aborting flush because server is aborted...");
2110 }
2111 final long startTime = EnvironmentEdgeManager.currentTime();
2112
2113 if (this.memstoreSize.get() <= 0) {
2114
2115
2116 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
2117 this.updatesLock.writeLock().lock();
2118 try {
2119 if (this.memstoreSize.get() <= 0) {
2120
2121
2122
2123
2124
2125
2126 if (wal != null) {
2127 writeEntry = mvcc.beginMemstoreInsert();
2128 long flushOpSeqId = getNextSequenceId(wal);
2129 FlushResult flushResult = new FlushResultImpl(
2130 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush",
2131 writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
2132 writeEntry.setWriteNumber(flushOpSeqId);
2133 mvcc.waitForPreviousTransactionsComplete(writeEntry);
2134 writeEntry = null;
2135 return new PrepareFlushResult(flushResult, myseqid);
2136 } else {
2137 return new PrepareFlushResult(
2138 new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2139 "Nothing to flush", false),
2140 myseqid);
2141 }
2142 }
2143 } finally {
2144 this.updatesLock.writeLock().unlock();
2145 if (writeEntry != null) {
2146 mvcc.advanceMemstore(writeEntry);
2147 }
2148 }
2149 }
2150
2151 if (LOG.isInfoEnabled()) {
2152 LOG.info("Started memstore flush for " + this + ", current region memstore size "
2153 + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
2154 + stores.size() + " column families' memstores are being flushed."
2155 + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
2156
2157 if (this.stores.size() > storesToFlush.size()) {
2158 for (Store store: storesToFlush) {
2159 LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
2160 + " which was occupying "
2161 + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
2162 }
2163 }
2164 }
2165
2166
2167
2168
2169 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
2170
2171
2172 status.setStatus("Obtaining lock to block concurrent updates");
2173
2174 this.updatesLock.writeLock().lock();
2175 status.setStatus("Preparing to flush by snapshotting stores in " +
2176 getRegionInfo().getEncodedName());
2177 long totalFlushableSizeOfFlushableStores = 0;
2178
2179 Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
2180 for (Store store: storesToFlush) {
2181 flushedFamilyNames.add(store.getFamily().getName());
2182 }
2183
2184 TreeMap<byte[], StoreFlushContext> storeFlushCtxs
2185 = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
2186 TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
2187 Bytes.BYTES_COMPARATOR);
2188 TreeMap<byte[], Long> storeFlushableSize
2189 = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2190
2191
2192 long flushOpSeqId = HConstants.NO_SEQNUM;
2193
2194
2195 long flushedSeqId = HConstants.NO_SEQNUM;
2196 byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
2197
2198 long trxId = 0;
2199 try {
2200 try {
2201 writeEntry = mvcc.beginMemstoreInsert();
2202
2203
2204
2205
2206
2207 mvcc.waitForPreviousTransactionsComplete(writeEntry);
2208
2209 writeEntry = null;
2210
2211 if (wal != null) {
2212 Long earliestUnflushedSequenceIdForTheRegion =
2213 wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
2214 if (earliestUnflushedSequenceIdForTheRegion == null) {
2215
2216 String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
2217 status.setStatus(msg);
2218 return new PrepareFlushResult(
2219 new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
2220 myseqid);
2221 }
2222 flushOpSeqId = getNextSequenceId(wal);
2223
2224 flushedSeqId =
2225 earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
2226 flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
2227 } else {
2228
2229 flushedSeqId = flushOpSeqId = myseqid;
2230 }
2231
2232 for (Store s : storesToFlush) {
2233 totalFlushableSizeOfFlushableStores += s.getFlushableSize();
2234 storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
2235 committedFiles.put(s.getFamily().getName(), null);
2236 storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
2237 }
2238
2239
2240 if (wal != null && !writestate.readOnly) {
2241 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
2242 getRegionInfo(), flushOpSeqId, committedFiles);
2243
2244 trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2245 desc, sequenceId, false);
2246 }
2247
2248
2249 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2250 flush.prepare();
2251 }
2252 } catch (IOException ex) {
2253 if (wal != null) {
2254 if (trxId > 0) {
2255 try {
2256 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2257 getRegionInfo(), flushOpSeqId, committedFiles);
2258 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2259 desc, sequenceId, false);
2260 } catch (Throwable t) {
2261 LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2262 StringUtils.stringifyException(t));
2263
2264 }
2265 }
2266
2267 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2268 throw ex;
2269 }
2270 } finally {
2271 this.updatesLock.writeLock().unlock();
2272 }
2273 String s = "Finished memstore snapshotting " + this +
2274 ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2275 status.setStatus(s);
2276 if (LOG.isTraceEnabled()) LOG.trace(s);
2277
2278
2279 if (wal != null) {
2280 try {
2281 wal.sync();
2282 } catch (IOException ioe) {
2283 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2284 throw ioe;
2285 }
2286 }
2287 } finally {
2288 if (writeEntry != null) {
2289
2290 mvcc.advanceMemstore(writeEntry);
2291 }
2292 }
2293 return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId,
2294 flushedSeqId, totalFlushableSizeOfFlushableStores);
2295 }
2296
2297
2298
2299
2300
2301
2302
2303 private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
2304 if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
2305 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
2306 getRegionInfo(), -1, new TreeMap<byte[], List<Path>>());
2307 try {
2308 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2309 desc, sequenceId, true);
2310 return true;
2311 } catch (IOException e) {
2312 LOG.warn(getRegionInfo().getEncodedName() + " : "
2313 + "Received exception while trying to write the flush request to wal", e);
2314 }
2315 }
2316 return false;
2317 }
2318
2319 protected FlushResult internalFlushCacheAndCommit(
2320 final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
2321 final Collection<Store> storesToFlush)
2322 throws IOException {
2323
2324
2325 TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
2326 TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
2327 long startTime = prepareResult.startTime;
2328 long flushOpSeqId = prepareResult.flushOpSeqId;
2329 long flushedSeqId = prepareResult.flushedSeqId;
2330 long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
2331
2332 String s = "Flushing stores of " + this;
2333 status.setStatus(s);
2334 if (LOG.isTraceEnabled()) LOG.trace(s);
2335
2336
2337
2338
2339
2340 boolean compactionRequested = false;
2341 try {
2342
2343
2344
2345
2346
2347 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2348 flush.flushCache(status);
2349 }
2350
2351
2352
2353 Iterator<Store> it = storesToFlush.iterator();
2354
2355 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2356 boolean needsCompaction = flush.commit(status);
2357 if (needsCompaction) {
2358 compactionRequested = true;
2359 }
2360 byte[] storeName = it.next().getFamily().getName();
2361 List<Path> storeCommittedFiles = flush.getCommittedFiles();
2362 committedFiles.put(storeName, storeCommittedFiles);
2363
2364 if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
2365 totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
2366 }
2367 }
2368 storeFlushCtxs.clear();
2369
2370
2371 this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2372
2373 if (wal != null) {
2374
2375 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2376 getRegionInfo(), flushOpSeqId, committedFiles);
2377 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2378 desc, sequenceId, true);
2379 }
2380 } catch (Throwable t) {
2381
2382
2383
2384
2385
2386
2387 if (wal != null) {
2388 try {
2389 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2390 getRegionInfo(), flushOpSeqId, committedFiles);
2391 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2392 desc, sequenceId, false);
2393 } catch (Throwable ex) {
2394 LOG.warn(getRegionInfo().getEncodedName() + " : "
2395 + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:"
2396 + StringUtils.stringifyException(ex));
2397
2398 }
2399 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2400 }
2401 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2402 Bytes.toStringBinary(getRegionInfo().getRegionName()));
2403 dse.initCause(t);
2404 status.abort("Flush failed: " + StringUtils.stringifyException(t));
2405
2406
2407
2408
2409
2410 this.closing.set(true);
2411
2412 if (rsServices != null) {
2413
2414 rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
2415 }
2416
2417 throw dse;
2418 }
2419
2420
2421 if (wal != null) {
2422 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2423 }
2424
2425
2426 for (Store store: storesToFlush) {
2427 this.lastStoreFlushTimeMap.put(store, startTime);
2428 }
2429
2430
2431 this.maxFlushedSeqId = flushedSeqId;
2432
2433
2434 this.lastFlushOpSeqId = flushOpSeqId;
2435
2436
2437
2438 synchronized (this) {
2439 notifyAll();
2440 }
2441
2442 long time = EnvironmentEdgeManager.currentTime() - startTime;
2443 long memstoresize = this.memstoreSize.get();
2444 String msg = "Finished memstore flush of ~"
2445 + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2446 + totalFlushableSizeOfFlushableStores + ", currentsize="
2447 + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2448 + " for region " + this + " in " + time + "ms, sequenceid="
2449 + flushOpSeqId + ", compaction requested=" + compactionRequested
2450 + ((wal == null) ? "; wal=null" : "");
2451 LOG.info(msg);
2452 status.setStatus(msg);
2453
2454 return new FlushResultImpl(compactionRequested ?
2455 FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2456 FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
2457 flushOpSeqId);
2458 }
2459
2460
2461
2462
2463
2464
2465 @VisibleForTesting
2466 protected long getNextSequenceId(final WAL wal) throws IOException {
2467
2468
2469
2470
2471
2472 WALKey key = this.appendEmptyEdit(wal, null);
2473 return key.getSequenceId(maxWaitForSeqId);
2474 }
2475
2476
2477
2478
2479
2480 @Override
2481 public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException {
2482 if (coprocessorHost != null) {
2483 Result result = new Result();
2484 if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2485 return result;
2486 }
2487 }
2488
2489
2490 checkRow(row, "getClosestRowBefore");
2491 startRegionOperation(Operation.GET);
2492 this.readRequestsCount.increment();
2493 try {
2494 Store store = getStore(family);
2495
2496 Cell key = store.getRowKeyAtOrBefore(row);
2497 Result result = null;
2498 if (key != null) {
2499 Get get = new Get(CellUtil.cloneRow(key));
2500 get.addFamily(family);
2501 result = get(get);
2502 }
2503 if (coprocessorHost != null) {
2504 coprocessorHost.postGetClosestRowBefore(row, family, result);
2505 }
2506 return result;
2507 } finally {
2508 closeRegionOperation(Operation.GET);
2509 }
2510 }
2511
2512 @Override
2513 public RegionScanner getScanner(Scan scan) throws IOException {
2514 return getScanner(scan, null);
2515 }
2516
2517 protected RegionScanner getScanner(Scan scan,
2518 List<KeyValueScanner> additionalScanners) throws IOException {
2519 startRegionOperation(Operation.SCAN);
2520 try {
2521
2522 if (!scan.hasFamilies()) {
2523
2524 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
2525 scan.addFamily(family);
2526 }
2527 } else {
2528 for (byte [] family : scan.getFamilyMap().keySet()) {
2529 checkFamily(family);
2530 }
2531 }
2532 return instantiateRegionScanner(scan, additionalScanners);
2533 } finally {
2534 closeRegionOperation(Operation.SCAN);
2535 }
2536 }
2537
2538 protected RegionScanner instantiateRegionScanner(Scan scan,
2539 List<KeyValueScanner> additionalScanners) throws IOException {
2540 if (scan.isReversed()) {
2541 if (scan.getFilter() != null) {
2542 scan.getFilter().setReversed(true);
2543 }
2544 return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2545 }
2546 return new RegionScannerImpl(scan, additionalScanners, this);
2547 }
2548
2549 @Override
2550 public void prepareDelete(Delete delete) throws IOException {
2551
2552 if(delete.getFamilyCellMap().isEmpty()){
2553 for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2554
2555 delete.addFamily(family, delete.getTimeStamp());
2556 }
2557 } else {
2558 for(byte [] family : delete.getFamilyCellMap().keySet()) {
2559 if(family == null) {
2560 throw new NoSuchColumnFamilyException("Empty family is invalid");
2561 }
2562 checkFamily(family);
2563 }
2564 }
2565 }
2566
2567 @Override
2568 public void delete(Delete delete) throws IOException {
2569 checkReadOnly();
2570 checkResources();
2571 startRegionOperation(Operation.DELETE);
2572 try {
2573 delete.getRow();
2574
2575 doBatchMutate(delete);
2576 } finally {
2577 closeRegionOperation(Operation.DELETE);
2578 }
2579 }
2580
2581
2582
2583
2584 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2585
2586
2587
2588
2589
2590
2591 void delete(NavigableMap<byte[], List<Cell>> familyMap,
2592 Durability durability) throws IOException {
2593 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2594 delete.setFamilyCellMap(familyMap);
2595 delete.setDurability(durability);
2596 doBatchMutate(delete);
2597 }
2598
2599 @Override
2600 public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2601 byte[] byteNow) throws IOException {
2602 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2603
2604 byte[] family = e.getKey();
2605 List<Cell> cells = e.getValue();
2606 assert cells instanceof RandomAccess;
2607
2608 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2609 int listSize = cells.size();
2610 for (int i=0; i < listSize; i++) {
2611 Cell cell = cells.get(i);
2612
2613
2614 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2615 byte[] qual = CellUtil.cloneQualifier(cell);
2616 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2617
2618 Integer count = kvCount.get(qual);
2619 if (count == null) {
2620 kvCount.put(qual, 1);
2621 } else {
2622 kvCount.put(qual, count + 1);
2623 }
2624 count = kvCount.get(qual);
2625
2626 Get get = new Get(CellUtil.cloneRow(cell));
2627 get.setMaxVersions(count);
2628 get.addColumn(family, qual);
2629 if (coprocessorHost != null) {
2630 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2631 byteNow, get)) {
2632 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2633 }
2634 } else {
2635 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2636 }
2637 } else {
2638 CellUtil.updateLatestStamp(cell, byteNow, 0);
2639 }
2640 }
2641 }
2642 }
2643
2644 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2645 throws IOException {
2646 List<Cell> result = get(get, false);
2647
2648 if (result.size() < count) {
2649
2650 CellUtil.updateLatestStamp(cell, byteNow, 0);
2651 return;
2652 }
2653 if (result.size() > count) {
2654 throw new RuntimeException("Unexpected size: " + result.size());
2655 }
2656 Cell getCell = result.get(count - 1);
2657 CellUtil.setTimestamp(cell, getCell.getTimestamp());
2658 }
2659
2660 @Override
2661 public void put(Put put) throws IOException {
2662 checkReadOnly();
2663
2664
2665
2666
2667
2668 checkResources();
2669 startRegionOperation(Operation.PUT);
2670 try {
2671
2672 doBatchMutate(put);
2673 } finally {
2674 closeRegionOperation(Operation.PUT);
2675 }
2676 }
2677
2678
2679
2680
2681
2682
2683 private abstract static class BatchOperationInProgress<T> {
2684 T[] operations;
2685 int nextIndexToProcess = 0;
2686 OperationStatus[] retCodeDetails;
2687 WALEdit[] walEditsFromCoprocessors;
2688
2689 public BatchOperationInProgress(T[] operations) {
2690 this.operations = operations;
2691 this.retCodeDetails = new OperationStatus[operations.length];
2692 this.walEditsFromCoprocessors = new WALEdit[operations.length];
2693 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2694 }
2695
2696 public abstract Mutation getMutation(int index);
2697 public abstract long getNonceGroup(int index);
2698 public abstract long getNonce(int index);
2699
2700 public abstract Mutation[] getMutationsForCoprocs();
2701 public abstract boolean isInReplay();
2702 public abstract long getReplaySequenceId();
2703
2704 public boolean isDone() {
2705 return nextIndexToProcess == operations.length;
2706 }
2707 }
2708
2709 private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2710 private long nonceGroup;
2711 private long nonce;
2712 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2713 super(operations);
2714 this.nonceGroup = nonceGroup;
2715 this.nonce = nonce;
2716 }
2717
2718 @Override
2719 public Mutation getMutation(int index) {
2720 return this.operations[index];
2721 }
2722
2723 @Override
2724 public long getNonceGroup(int index) {
2725 return nonceGroup;
2726 }
2727
2728 @Override
2729 public long getNonce(int index) {
2730 return nonce;
2731 }
2732
2733 @Override
2734 public Mutation[] getMutationsForCoprocs() {
2735 return this.operations;
2736 }
2737
2738 @Override
2739 public boolean isInReplay() {
2740 return false;
2741 }
2742
2743 @Override
2744 public long getReplaySequenceId() {
2745 return 0;
2746 }
2747 }
2748
2749 private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2750 private long replaySeqId = 0;
2751 public ReplayBatch(MutationReplay[] operations, long seqId) {
2752 super(operations);
2753 this.replaySeqId = seqId;
2754 }
2755
2756 @Override
2757 public Mutation getMutation(int index) {
2758 return this.operations[index].mutation;
2759 }
2760
2761 @Override
2762 public long getNonceGroup(int index) {
2763 return this.operations[index].nonceGroup;
2764 }
2765
2766 @Override
2767 public long getNonce(int index) {
2768 return this.operations[index].nonce;
2769 }
2770
2771 @Override
2772 public Mutation[] getMutationsForCoprocs() {
2773 assert false;
2774 throw new RuntimeException("Should not be called for replay batch");
2775 }
2776
2777 @Override
2778 public boolean isInReplay() {
2779 return true;
2780 }
2781
2782 @Override
2783 public long getReplaySequenceId() {
2784 return this.replaySeqId;
2785 }
2786 }
2787
2788 @Override
2789 public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
2790 throws IOException {
2791
2792
2793
2794
2795 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2796 }
2797
2798 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2799 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2800 }
2801
2802 @Override
2803 public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2804 throws IOException {
2805 if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
2806 && replaySeqId < lastReplayedOpenRegionSeqId) {
2807
2808
2809 if (LOG.isTraceEnabled()) {
2810 LOG.trace(getRegionInfo().getEncodedName() + " : "
2811 + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
2812 + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
2813 for (MutationReplay mut : mutations) {
2814 LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
2815 }
2816 }
2817
2818 OperationStatus[] statuses = new OperationStatus[mutations.length];
2819 for (int i = 0; i < statuses.length; i++) {
2820 statuses[i] = OperationStatus.SUCCESS;
2821 }
2822 return statuses;
2823 }
2824 return batchMutate(new ReplayBatch(mutations, replaySeqId));
2825 }
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2836 boolean initialized = false;
2837 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2838 startRegionOperation(op);
2839 try {
2840 while (!batchOp.isDone()) {
2841 if (!batchOp.isInReplay()) {
2842 checkReadOnly();
2843 }
2844 checkResources();
2845
2846 if (!initialized) {
2847 this.writeRequestsCount.add(batchOp.operations.length);
2848 if (!batchOp.isInReplay()) {
2849 doPreMutationHook(batchOp);
2850 }
2851 initialized = true;
2852 }
2853 doMiniBatchMutation(batchOp);
2854 long newSize = this.getMemstoreSize();
2855 if (isFlushSize(newSize)) {
2856 requestFlush();
2857 }
2858 }
2859 } finally {
2860 closeRegionOperation(op);
2861 }
2862 return batchOp.retCodeDetails;
2863 }
2864
2865
2866 private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2867 throws IOException {
2868
2869 WALEdit walEdit = new WALEdit();
2870 if (coprocessorHost != null) {
2871 for (int i = 0 ; i < batchOp.operations.length; i++) {
2872 Mutation m = batchOp.getMutation(i);
2873 if (m instanceof Put) {
2874 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2875
2876
2877 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2878 }
2879 } else if (m instanceof Delete) {
2880 Delete curDel = (Delete) m;
2881 if (curDel.getFamilyCellMap().isEmpty()) {
2882
2883 prepareDelete(curDel);
2884 }
2885 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2886
2887
2888 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2889 }
2890 } else {
2891
2892
2893
2894 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2895 "Put/Delete mutations only supported in batchMutate() now");
2896 }
2897 if (!walEdit.isEmpty()) {
2898 batchOp.walEditsFromCoprocessors[i] = walEdit;
2899 walEdit = new WALEdit();
2900 }
2901 }
2902 }
2903 }
2904
2905 @SuppressWarnings("unchecked")
2906 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2907 boolean isInReplay = batchOp.isInReplay();
2908
2909 boolean putsCfSetConsistent = true;
2910
2911 Set<byte[]> putsCfSet = null;
2912
2913 boolean deletesCfSetConsistent = true;
2914
2915 Set<byte[]> deletesCfSet = null;
2916
2917 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2918 WALEdit walEdit = new WALEdit(isInReplay);
2919 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
2920 long txid = 0;
2921 boolean doRollBackMemstore = false;
2922 boolean locked = false;
2923
2924
2925 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2926
2927 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2928 List<Cell> memstoreCells = new ArrayList<Cell>();
2929
2930 int firstIndex = batchOp.nextIndexToProcess;
2931 int lastIndexExclusive = firstIndex;
2932 boolean success = false;
2933 int noOfPuts = 0, noOfDeletes = 0;
2934 WALKey walKey = null;
2935 long mvccNum = 0;
2936 long addedSize = 0;
2937 try {
2938
2939
2940
2941
2942 int numReadyToWrite = 0;
2943 long now = EnvironmentEdgeManager.currentTime();
2944 while (lastIndexExclusive < batchOp.operations.length) {
2945 Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2946 boolean isPutMutation = mutation instanceof Put;
2947
2948 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2949
2950 familyMaps[lastIndexExclusive] = familyMap;
2951
2952
2953 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2954 != OperationStatusCode.NOT_RUN) {
2955 lastIndexExclusive++;
2956 continue;
2957 }
2958
2959 try {
2960 if (isPutMutation) {
2961
2962 if (isInReplay) {
2963 removeNonExistentColumnFamilyForReplay(familyMap);
2964 } else {
2965 checkFamilies(familyMap.keySet());
2966 }
2967 checkTimestamps(mutation.getFamilyCellMap(), now);
2968 } else {
2969 prepareDelete((Delete) mutation);
2970 }
2971 checkRow(mutation.getRow(), "doMiniBatchMutation");
2972 } catch (NoSuchColumnFamilyException nscf) {
2973 LOG.warn("No such column family in batch mutation", nscf);
2974 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2975 OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2976 lastIndexExclusive++;
2977 continue;
2978 } catch (FailedSanityCheckException fsce) {
2979 LOG.warn("Batch Mutation did not pass sanity check", fsce);
2980 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2981 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2982 lastIndexExclusive++;
2983 continue;
2984 } catch (WrongRegionException we) {
2985 LOG.warn("Batch mutation had a row that does not belong to this region", we);
2986 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2987 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
2988 lastIndexExclusive++;
2989 continue;
2990 }
2991
2992
2993
2994 boolean shouldBlock = numReadyToWrite == 0;
2995 RowLock rowLock = null;
2996 try {
2997 rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
2998 } catch (IOException ioe) {
2999 LOG.warn("Failed getting lock in batch put, row="
3000 + Bytes.toStringBinary(mutation.getRow()), ioe);
3001 }
3002 if (rowLock == null) {
3003
3004 break;
3005 } else {
3006 acquiredRowLocks.add(rowLock);
3007 }
3008
3009 lastIndexExclusive++;
3010 numReadyToWrite++;
3011
3012 if (isPutMutation) {
3013
3014
3015
3016 if (putsCfSet == null) {
3017 putsCfSet = mutation.getFamilyCellMap().keySet();
3018 } else {
3019 putsCfSetConsistent = putsCfSetConsistent
3020 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
3021 }
3022 } else {
3023 if (deletesCfSet == null) {
3024 deletesCfSet = mutation.getFamilyCellMap().keySet();
3025 } else {
3026 deletesCfSetConsistent = deletesCfSetConsistent
3027 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
3028 }
3029 }
3030 }
3031
3032
3033
3034 now = EnvironmentEdgeManager.currentTime();
3035 byte[] byteNow = Bytes.toBytes(now);
3036
3037
3038 if (numReadyToWrite <= 0) return 0L;
3039
3040
3041
3042
3043
3044
3045 for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
3046
3047 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3048 != OperationStatusCode.NOT_RUN) continue;
3049
3050 Mutation mutation = batchOp.getMutation(i);
3051 if (mutation instanceof Put) {
3052 updateCellTimestamps(familyMaps[i].values(), byteNow);
3053 noOfPuts++;
3054 } else {
3055 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
3056 noOfDeletes++;
3057 }
3058 rewriteCellTags(familyMaps[i], mutation);
3059 }
3060
3061 lock(this.updatesLock.readLock(), numReadyToWrite);
3062 locked = true;
3063 if(isInReplay) {
3064 mvccNum = batchOp.getReplaySequenceId();
3065 } else {
3066 mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
3067 }
3068
3069
3070
3071
3072 writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
3073
3074
3075 if (!isInReplay && coprocessorHost != null) {
3076 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3077 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3078 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3079 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
3080 }
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3092 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3093 != OperationStatusCode.NOT_RUN) {
3094 continue;
3095 }
3096 doRollBackMemstore = true;
3097 addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
3098 }
3099
3100
3101
3102
3103 Durability durability = Durability.USE_DEFAULT;
3104 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3105
3106 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3107 != OperationStatusCode.NOT_RUN) {
3108 continue;
3109 }
3110 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
3111
3112 Mutation m = batchOp.getMutation(i);
3113 Durability tmpDur = getEffectiveDurability(m.getDurability());
3114 if (tmpDur.ordinal() > durability.ordinal()) {
3115 durability = tmpDur;
3116 }
3117 if (tmpDur == Durability.SKIP_WAL) {
3118 recordMutationWithoutWal(m.getFamilyCellMap());
3119 continue;
3120 }
3121
3122 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
3123
3124
3125
3126 if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
3127 if (walEdit.size() > 0) {
3128 assert isInReplay;
3129 if (!isInReplay) {
3130 throw new IOException("Multiple nonces per batch and not in replay");
3131 }
3132
3133
3134 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3135 this.htableDescriptor.getTableName(), now, m.getClusterIds(),
3136 currentNonceGroup, currentNonce);
3137 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
3138 walEdit, getSequenceId(), true, null);
3139 walEdit = new WALEdit(isInReplay);
3140 walKey = null;
3141 }
3142 currentNonceGroup = nonceGroup;
3143 currentNonce = nonce;
3144 }
3145
3146
3147 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
3148 if (fromCP != null) {
3149 for (Cell cell : fromCP.getCells()) {
3150 walEdit.add(cell);
3151 }
3152 }
3153 addFamilyMapToWALEdit(familyMaps[i], walEdit);
3154 }
3155
3156
3157
3158
3159 Mutation mutation = batchOp.getMutation(firstIndex);
3160 if (isInReplay) {
3161
3162 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3163 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3164 mutation.getClusterIds(), currentNonceGroup, currentNonce);
3165 long replaySeqId = batchOp.getReplaySequenceId();
3166 walKey.setOrigLogSeqNum(replaySeqId);
3167
3168
3169 while (true) {
3170 long seqId = getSequenceId().get();
3171 if (seqId >= replaySeqId) break;
3172 if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
3173 }
3174 }
3175 if (walEdit.size() > 0) {
3176 if (!isInReplay) {
3177
3178 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3179 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3180 mutation.getClusterIds(), currentNonceGroup, currentNonce);
3181 }
3182
3183 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
3184 getSequenceId(), true, memstoreCells);
3185 }
3186 if (walKey == null){
3187
3188 walKey = this.appendEmptyEdit(this.wal, memstoreCells);
3189 }
3190
3191
3192
3193
3194 if (locked) {
3195 this.updatesLock.readLock().unlock();
3196 locked = false;
3197 }
3198 releaseRowLocks(acquiredRowLocks);
3199
3200
3201
3202
3203 if (txid != 0) {
3204 syncOrDefer(txid, durability);
3205 }
3206
3207 doRollBackMemstore = false;
3208
3209 if (!isInReplay && coprocessorHost != null) {
3210 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3211 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3212 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3213 coprocessorHost.postBatchMutate(miniBatchOp);
3214 }
3215
3216
3217
3218
3219
3220 if (writeEntry != null) {
3221 mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
3222 writeEntry = null;
3223 }
3224
3225
3226
3227
3228
3229 if (!isInReplay && coprocessorHost != null) {
3230 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3231
3232 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3233 != OperationStatusCode.SUCCESS) {
3234 continue;
3235 }
3236 Mutation m = batchOp.getMutation(i);
3237 if (m instanceof Put) {
3238 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
3239 } else {
3240 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
3241 }
3242 }
3243 }
3244
3245 success = true;
3246 return addedSize;
3247 } finally {
3248
3249 if (doRollBackMemstore) {
3250 rollbackMemstore(memstoreCells);
3251 if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
3252 } else {
3253 this.addAndGetGlobalMemstoreSize(addedSize);
3254 if (writeEntry != null) {
3255 mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
3256 }
3257 }
3258
3259 if (locked) {
3260 this.updatesLock.readLock().unlock();
3261 }
3262 releaseRowLocks(acquiredRowLocks);
3263
3264
3265
3266
3267
3268
3269
3270 if (noOfPuts > 0) {
3271
3272 if (this.metricsRegion != null) {
3273 this.metricsRegion.updatePut();
3274 }
3275 }
3276 if (noOfDeletes > 0) {
3277
3278 if (this.metricsRegion != null) {
3279 this.metricsRegion.updateDelete();
3280 }
3281 }
3282 if (!success) {
3283 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3284 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
3285 batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3286 }
3287 }
3288 }
3289 if (coprocessorHost != null && !batchOp.isInReplay()) {
3290
3291
3292 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3293 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3294 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
3295 lastIndexExclusive);
3296 coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3297 }
3298
3299 batchOp.nextIndexToProcess = lastIndexExclusive;
3300 }
3301 }
3302
3303
3304
3305
3306
3307 protected Durability getEffectiveDurability(Durability d) {
3308 return d == Durability.USE_DEFAULT ? this.durability : d;
3309 }
3310
3311
3312
3313
3314
3315
3316 @Override
3317 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3318 CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3319 boolean writeToWAL)
3320 throws IOException{
3321 checkReadOnly();
3322
3323
3324 checkResources();
3325 boolean isPut = w instanceof Put;
3326 if (!isPut && !(w instanceof Delete))
3327 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3328 "be Put or Delete");
3329 if (!Bytes.equals(row, w.getRow())) {
3330 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3331 "getRow must match the passed row");
3332 }
3333
3334 startRegionOperation();
3335 try {
3336 Get get = new Get(row);
3337 checkFamily(family);
3338 get.addColumn(family, qualifier);
3339
3340
3341 RowLock rowLock = getRowLock(get.getRow());
3342
3343 mvcc.waitForPreviousTransactionsComplete();
3344 try {
3345 if (this.getCoprocessorHost() != null) {
3346 Boolean processed = null;
3347 if (w instanceof Put) {
3348 processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3349 qualifier, compareOp, comparator, (Put) w);
3350 } else if (w instanceof Delete) {
3351 processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3352 qualifier, compareOp, comparator, (Delete) w);
3353 }
3354 if (processed != null) {
3355 return processed;
3356 }
3357 }
3358 List<Cell> result = get(get, false);
3359
3360 boolean valueIsNull = comparator.getValue() == null ||
3361 comparator.getValue().length == 0;
3362 boolean matches = false;
3363 long cellTs = 0;
3364 if (result.size() == 0 && valueIsNull) {
3365 matches = true;
3366 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3367 valueIsNull) {
3368 matches = true;
3369 cellTs = result.get(0).getTimestamp();
3370 } else if (result.size() == 1 && !valueIsNull) {
3371 Cell kv = result.get(0);
3372 cellTs = kv.getTimestamp();
3373 int compareResult = comparator.compareTo(kv.getValueArray(),
3374 kv.getValueOffset(), kv.getValueLength());
3375 switch (compareOp) {
3376 case LESS:
3377 matches = compareResult < 0;
3378 break;
3379 case LESS_OR_EQUAL:
3380 matches = compareResult <= 0;
3381 break;
3382 case EQUAL:
3383 matches = compareResult == 0;
3384 break;
3385 case NOT_EQUAL:
3386 matches = compareResult != 0;
3387 break;
3388 case GREATER_OR_EQUAL:
3389 matches = compareResult >= 0;
3390 break;
3391 case GREATER:
3392 matches = compareResult > 0;
3393 break;
3394 default:
3395 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3396 }
3397 }
3398
3399 if (matches) {
3400
3401
3402
3403
3404 long now = EnvironmentEdgeManager.currentTime();
3405 long ts = Math.max(now, cellTs);
3406 byte[] byteTs = Bytes.toBytes(ts);
3407
3408 if (w instanceof Put) {
3409 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3410 }
3411
3412
3413
3414
3415
3416 doBatchMutate(w);
3417 this.checkAndMutateChecksPassed.increment();
3418 return true;
3419 }
3420 this.checkAndMutateChecksFailed.increment();
3421 return false;
3422 } finally {
3423 rowLock.release();
3424 }
3425 } finally {
3426 closeRegionOperation();
3427 }
3428 }
3429
3430
3431
3432
3433
3434
3435 @Override
3436 public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3437 CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3438 boolean writeToWAL) throws IOException {
3439 checkReadOnly();
3440
3441
3442 checkResources();
3443
3444 startRegionOperation();
3445 try {
3446 Get get = new Get(row);
3447 checkFamily(family);
3448 get.addColumn(family, qualifier);
3449
3450
3451 RowLock rowLock = getRowLock(get.getRow());
3452
3453 mvcc.waitForPreviousTransactionsComplete();
3454 try {
3455 List<Cell> result = get(get, false);
3456
3457 boolean valueIsNull = comparator.getValue() == null ||
3458 comparator.getValue().length == 0;
3459 boolean matches = false;
3460 long cellTs = 0;
3461 if (result.size() == 0 && valueIsNull) {
3462 matches = true;
3463 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3464 valueIsNull) {
3465 matches = true;
3466 cellTs = result.get(0).getTimestamp();
3467 } else if (result.size() == 1 && !valueIsNull) {
3468 Cell kv = result.get(0);
3469 cellTs = kv.getTimestamp();
3470 int compareResult = comparator.compareTo(kv.getValueArray(),
3471 kv.getValueOffset(), kv.getValueLength());
3472 switch (compareOp) {
3473 case LESS:
3474 matches = compareResult < 0;
3475 break;
3476 case LESS_OR_EQUAL:
3477 matches = compareResult <= 0;
3478 break;
3479 case EQUAL:
3480 matches = compareResult == 0;
3481 break;
3482 case NOT_EQUAL:
3483 matches = compareResult != 0;
3484 break;
3485 case GREATER_OR_EQUAL:
3486 matches = compareResult >= 0;
3487 break;
3488 case GREATER:
3489 matches = compareResult > 0;
3490 break;
3491 default:
3492 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3493 }
3494 }
3495
3496 if (matches) {
3497
3498
3499
3500
3501 long now = EnvironmentEdgeManager.currentTime();
3502 long ts = Math.max(now, cellTs);
3503 byte[] byteTs = Bytes.toBytes(ts);
3504
3505 for (Mutation w : rm.getMutations()) {
3506 if (w instanceof Put) {
3507 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3508 }
3509
3510
3511 }
3512
3513
3514
3515 mutateRow(rm);
3516 this.checkAndMutateChecksPassed.increment();
3517 return true;
3518 }
3519 this.checkAndMutateChecksFailed.increment();
3520 return false;
3521 } finally {
3522 rowLock.release();
3523 }
3524 } finally {
3525 closeRegionOperation();
3526 }
3527 }
3528
3529 private void doBatchMutate(Mutation mutation) throws IOException {
3530
3531 OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
3532 if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3533 throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3534 } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3535 throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3536 }
3537 }
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
3549
3550
3551
3552 public void addRegionToSnapshot(SnapshotDescription desc,
3553 ForeignExceptionSnare exnSnare) throws IOException {
3554 Path rootDir = FSUtils.getRootDir(conf);
3555 Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3556
3557 SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3558 snapshotDir, desc, exnSnare);
3559 manifest.addRegion(this);
3560 }
3561
3562 @Override
3563 public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3564 throws IOException {
3565 for (List<Cell> cells: cellItr) {
3566 if (cells == null) continue;
3567 assert cells instanceof RandomAccess;
3568 int listSize = cells.size();
3569 for (int i = 0; i < listSize; i++) {
3570 CellUtil.updateLatestStamp(cells.get(i), now, 0);
3571 }
3572 }
3573 }
3574
3575
3576
3577
3578 void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3579
3580
3581
3582 if (m.getTTL() == Long.MAX_VALUE) {
3583 return;
3584 }
3585
3586
3587
3588 for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3589 List<Cell> cells = e.getValue();
3590 assert cells instanceof RandomAccess;
3591 int listSize = cells.size();
3592 for (int i = 0; i < listSize; i++) {
3593 Cell cell = cells.get(i);
3594 List<Tag> newTags = Tag.carryForwardTags(null, cell);
3595 newTags = carryForwardTTLTag(newTags, m);
3596
3597
3598 cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3599 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3600 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3601 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3602 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3603 newTags));
3604 }
3605 }
3606 }
3607
3608
3609
3610
3611
3612
3613
3614 private void checkResources() throws RegionTooBusyException {
3615
3616 if (this.getRegionInfo().isMetaRegion()) return;
3617
3618 if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3619 blockedRequestsCount.increment();
3620 requestFlush();
3621 throw new RegionTooBusyException("Above memstore limit, " +
3622 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3623 this.getRegionInfo().getRegionNameAsString()) +
3624 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3625 this.getRegionServerServices().getServerName()) +
3626 ", memstoreSize=" + memstoreSize.get() +
3627 ", blockingMemStoreSize=" + blockingMemStoreSize);
3628 }
3629 }
3630
3631
3632
3633
3634 protected void checkReadOnly() throws IOException {
3635 if (isReadOnly()) {
3636 throw new DoNotRetryIOException("region is read only");
3637 }
3638 }
3639
3640 protected void checkReadsEnabled() throws IOException {
3641 if (!this.writestate.readsEnabled) {
3642 throw new IOException(getRegionInfo().getEncodedName()
3643 + ": The region's reads are disabled. Cannot serve the request");
3644 }
3645 }
3646
3647 public void setReadsEnabled(boolean readsEnabled) {
3648 if (readsEnabled && !this.writestate.readsEnabled) {
3649 LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
3650 }
3651 this.writestate.setReadsEnabled(readsEnabled);
3652 }
3653
3654
3655
3656
3657
3658
3659
3660 private void put(final byte [] row, byte [] family, List<Cell> edits)
3661 throws IOException {
3662 NavigableMap<byte[], List<Cell>> familyMap;
3663 familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3664
3665 familyMap.put(family, edits);
3666 Put p = new Put(row);
3667 p.setFamilyCellMap(familyMap);
3668 doBatchMutate(p);
3669 }
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686 private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3687 long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
3688 long size = 0;
3689
3690 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3691 byte[] family = e.getKey();
3692 List<Cell> cells = e.getValue();
3693 assert cells instanceof RandomAccess;
3694 Store store = getStore(family);
3695 int listSize = cells.size();
3696 for (int i=0; i < listSize; i++) {
3697 Cell cell = cells.get(i);
3698 CellUtil.setSequenceId(cell, mvccNum);
3699 Pair<Long, Cell> ret = store.add(cell);
3700 size += ret.getFirst();
3701 memstoreCells.add(ret.getSecond());
3702 if(isInReplay) {
3703
3704 CellUtil.setSequenceId(ret.getSecond(), mvccNum);
3705 }
3706 }
3707 }
3708
3709 return size;
3710 }
3711
3712
3713
3714
3715
3716
3717 private void rollbackMemstore(List<Cell> memstoreCells) {
3718 int kvsRolledback = 0;
3719
3720 for (Cell cell : memstoreCells) {
3721 byte[] family = CellUtil.cloneFamily(cell);
3722 Store store = getStore(family);
3723 store.rollback(cell);
3724 kvsRolledback++;
3725 }
3726 LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3727 }
3728
3729 @Override
3730 public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
3731 for (byte[] family : families) {
3732 checkFamily(family);
3733 }
3734 }
3735
3736
3737
3738
3739
3740 private void removeNonExistentColumnFamilyForReplay(
3741 final Map<byte[], List<Cell>> familyMap) {
3742 List<byte[]> nonExistentList = null;
3743 for (byte[] family : familyMap.keySet()) {
3744 if (!this.htableDescriptor.hasFamily(family)) {
3745 if (nonExistentList == null) {
3746 nonExistentList = new ArrayList<byte[]>();
3747 }
3748 nonExistentList.add(family);
3749 }
3750 }
3751 if (nonExistentList != null) {
3752 for (byte[] family : nonExistentList) {
3753
3754 LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3755 familyMap.remove(family);
3756 }
3757 }
3758 }
3759
3760 @Override
3761 public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
3762 throws FailedSanityCheckException {
3763 if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3764 return;
3765 }
3766 long maxTs = now + timestampSlop;
3767 for (List<Cell> kvs : familyMap.values()) {
3768 assert kvs instanceof RandomAccess;
3769 int listSize = kvs.size();
3770 for (int i=0; i < listSize; i++) {
3771 Cell cell = kvs.get(i);
3772
3773 long ts = cell.getTimestamp();
3774 if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3775 throw new FailedSanityCheckException("Timestamp for KV out of range "
3776 + cell + " (too.new=" + timestampSlop + ")");
3777 }
3778 }
3779 }
3780 }
3781
3782
3783
3784
3785
3786
3787
3788 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3789 WALEdit walEdit) {
3790 for (List<Cell> edits : familyMap.values()) {
3791 assert edits instanceof RandomAccess;
3792 int listSize = edits.size();
3793 for (int i=0; i < listSize; i++) {
3794 Cell cell = edits.get(i);
3795 walEdit.add(cell);
3796 }
3797 }
3798 }
3799
3800 private void requestFlush() {
3801 if (this.rsServices == null) {
3802 return;
3803 }
3804 synchronized (writestate) {
3805 if (this.writestate.isFlushRequested()) {
3806 return;
3807 }
3808 writestate.flushRequested = true;
3809 }
3810
3811 this.rsServices.getFlushRequester().requestFlush(this, false);
3812 if (LOG.isDebugEnabled()) {
3813 LOG.debug("Flush requested on " + this);
3814 }
3815 }
3816
3817
3818
3819
3820
3821 private boolean isFlushSize(final long size) {
3822 return size > this.memstoreFlushSize;
3823 }
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859 protected long replayRecoveredEditsIfAny(final Path regiondir,
3860 Map<byte[], Long> maxSeqIdInStores,
3861 final CancelableProgressable reporter, final MonitoredTask status)
3862 throws IOException {
3863 long minSeqIdForTheRegion = -1;
3864 for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3865 if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3866 minSeqIdForTheRegion = maxSeqIdInStore;
3867 }
3868 }
3869 long seqid = minSeqIdForTheRegion;
3870
3871 FileSystem fs = this.fs.getFileSystem();
3872 NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3873 if (LOG.isDebugEnabled()) {
3874 LOG.debug("Found " + (files == null ? 0 : files.size())
3875 + " recovered edits file(s) under " + regiondir);
3876 }
3877
3878 if (files == null || files.isEmpty()) return seqid;
3879
3880 for (Path edits: files) {
3881 if (edits == null || !fs.exists(edits)) {
3882 LOG.warn("Null or non-existent edits file: " + edits);
3883 continue;
3884 }
3885 if (isZeroLengthThenDelete(fs, edits)) continue;
3886
3887 long maxSeqId;
3888 String fileName = edits.getName();
3889 maxSeqId = Math.abs(Long.parseLong(fileName));
3890 if (maxSeqId <= minSeqIdForTheRegion) {
3891 if (LOG.isDebugEnabled()) {
3892 String msg = "Maximum sequenceid for this wal is " + maxSeqId
3893 + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3894 + ", skipped the whole file, path=" + edits;
3895 LOG.debug(msg);
3896 }
3897 continue;
3898 }
3899
3900 try {
3901
3902
3903 seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3904 } catch (IOException e) {
3905 boolean skipErrors = conf.getBoolean(
3906 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3907 conf.getBoolean(
3908 "hbase.skip.errors",
3909 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3910 if (conf.get("hbase.skip.errors") != null) {
3911 LOG.warn(
3912 "The property 'hbase.skip.errors' has been deprecated. Please use " +
3913 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3914 }
3915 if (skipErrors) {
3916 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3917 LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3918 + "=true so continuing. Renamed " + edits +
3919 " as " + p, e);
3920 } else {
3921 throw e;
3922 }
3923 }
3924 }
3925
3926
3927 if (this.rsAccounting != null) {
3928 this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
3929 }
3930 if (seqid > minSeqIdForTheRegion) {
3931
3932 internalFlushcache(null, seqid, stores.values(), status, false);
3933 }
3934
3935 if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
3936
3937
3938
3939 String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
3940 Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
3941 for (Path file: files) {
3942 fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
3943 null, null));
3944 }
3945 getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
3946 } else {
3947 for (Path file: files) {
3948 if (!fs.delete(file, false)) {
3949 LOG.error("Failed delete of " + file);
3950 } else {
3951 LOG.debug("Deleted recovered.edits file=" + file);
3952 }
3953 }
3954 }
3955 return seqid;
3956 }
3957
3958
3959
3960
3961
3962
3963
3964
3965
3966
3967 private long replayRecoveredEdits(final Path edits,
3968 Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3969 throws IOException {
3970 String msg = "Replaying edits from " + edits;
3971 LOG.info(msg);
3972 MonitoredTask status = TaskMonitor.get().createStatus(msg);
3973 FileSystem fs = this.fs.getFileSystem();
3974
3975 status.setStatus("Opening recovered edits");
3976 WAL.Reader reader = null;
3977 try {
3978 reader = WALFactory.createReader(fs, edits, conf);
3979 long currentEditSeqId = -1;
3980 long currentReplaySeqId = -1;
3981 long firstSeqIdInLog = -1;
3982 long skippedEdits = 0;
3983 long editsCount = 0;
3984 long intervalEdits = 0;
3985 WAL.Entry entry;
3986 Store store = null;
3987 boolean reported_once = false;
3988 ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3989
3990 try {
3991
3992 int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
3993
3994 int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3995 long lastReport = EnvironmentEdgeManager.currentTime();
3996
3997 while ((entry = reader.next()) != null) {
3998 WALKey key = entry.getKey();
3999 WALEdit val = entry.getEdit();
4000
4001 if (ng != null) {
4002 ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
4003 }
4004
4005 if (reporter != null) {
4006 intervalEdits += val.size();
4007 if (intervalEdits >= interval) {
4008
4009 intervalEdits = 0;
4010 long cur = EnvironmentEdgeManager.currentTime();
4011 if (lastReport + period <= cur) {
4012 status.setStatus("Replaying edits..." +
4013 " skipped=" + skippedEdits +
4014 " edits=" + editsCount);
4015
4016 if(!reporter.progress()) {
4017 msg = "Progressable reporter failed, stopping replay";
4018 LOG.warn(msg);
4019 status.abort(msg);
4020 throw new IOException(msg);
4021 }
4022 reported_once = true;
4023 lastReport = cur;
4024 }
4025 }
4026 }
4027
4028 if (firstSeqIdInLog == -1) {
4029 firstSeqIdInLog = key.getLogSeqNum();
4030 }
4031 if (currentEditSeqId > key.getLogSeqNum()) {
4032
4033
4034 LOG.error(getRegionInfo().getEncodedName() + " : "
4035 + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
4036 + "; edit=" + val);
4037 } else {
4038 currentEditSeqId = key.getLogSeqNum();
4039 }
4040 currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
4041 key.getOrigLogSeqNum() : currentEditSeqId;
4042
4043
4044
4045 if (coprocessorHost != null) {
4046 status.setStatus("Running pre-WAL-restore hook in coprocessors");
4047 if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
4048
4049 continue;
4050 }
4051 }
4052
4053 if (!Bytes.equals(key.getEncodedRegionName(),
4054 this.getRegionInfo().getEncodedNameAsBytes())) {
4055 skippedEdits++;
4056 continue;
4057 }
4058
4059 boolean flush = false;
4060 for (Cell cell: val.getCells()) {
4061
4062
4063 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
4064
4065 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
4066 if (compaction != null) {
4067
4068 replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
4069 }
4070 skippedEdits++;
4071 continue;
4072 }
4073
4074 if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
4075 store = getStore(cell);
4076 }
4077 if (store == null) {
4078
4079
4080 LOG.warn("No family for " + cell);
4081 skippedEdits++;
4082 continue;
4083 }
4084
4085 if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
4086 .getName())) {
4087 skippedEdits++;
4088 continue;
4089 }
4090 CellUtil.setSequenceId(cell, currentReplaySeqId);
4091
4092
4093
4094
4095 flush |= restoreEdit(store, cell);
4096 editsCount++;
4097 }
4098 if (flush) {
4099 internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
4100 }
4101
4102 if (coprocessorHost != null) {
4103 coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
4104 }
4105 }
4106 } catch (EOFException eof) {
4107 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4108 msg = "Encountered EOF. Most likely due to Master failure during " +
4109 "wal splitting, so we have this data in another edit. " +
4110 "Continuing, but renaming " + edits + " as " + p;
4111 LOG.warn(msg, eof);
4112 status.abort(msg);
4113 } catch (IOException ioe) {
4114
4115
4116 if (ioe.getCause() instanceof ParseException) {
4117 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4118 msg = "File corruption encountered! " +
4119 "Continuing, but renaming " + edits + " as " + p;
4120 LOG.warn(msg, ioe);
4121 status.setStatus(msg);
4122 } else {
4123 status.abort(StringUtils.stringifyException(ioe));
4124
4125
4126 throw ioe;
4127 }
4128 }
4129 if (reporter != null && !reported_once) {
4130 reporter.progress();
4131 }
4132 msg = "Applied " + editsCount + ", skipped " + skippedEdits +
4133 ", firstSequenceIdInLog=" + firstSeqIdInLog +
4134 ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
4135 status.markComplete(msg);
4136 LOG.debug(msg);
4137 return currentEditSeqId;
4138 } finally {
4139 status.cleanup();
4140 if (reader != null) {
4141 reader.close();
4142 }
4143 }
4144 }
4145
4146
4147
4148
4149
4150
4151 void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
4152 boolean removeFiles, long replaySeqId)
4153 throws IOException {
4154 checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
4155 "Compaction marker from WAL ", compaction);
4156
4157 synchronized (writestate) {
4158 if (replaySeqId < lastReplayedOpenRegionSeqId) {
4159 LOG.warn(getRegionInfo().getEncodedName() + " : "
4160 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4161 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4162 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4163 return;
4164 }
4165 if (replaySeqId < lastReplayedCompactionSeqId) {
4166 LOG.warn(getRegionInfo().getEncodedName() + " : "
4167 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4168 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4169 + "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId);
4170 return;
4171 } else {
4172 lastReplayedCompactionSeqId = replaySeqId;
4173 }
4174
4175 if (LOG.isDebugEnabled()) {
4176 LOG.debug(getRegionInfo().getEncodedName() + " : "
4177 + "Replaying compaction marker " + TextFormat.shortDebugString(compaction)
4178 + " with seqId=" + replaySeqId + " and lastReplayedOpenRegionSeqId="
4179 + lastReplayedOpenRegionSeqId);
4180 }
4181
4182 startRegionOperation(Operation.REPLAY_EVENT);
4183 try {
4184 Store store = this.getStore(compaction.getFamilyName().toByteArray());
4185 if (store == null) {
4186 LOG.warn(getRegionInfo().getEncodedName() + " : "
4187 + "Found Compaction WAL edit for deleted family:"
4188 + Bytes.toString(compaction.getFamilyName().toByteArray()));
4189 return;
4190 }
4191 store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
4192 logRegionFiles();
4193 } catch (FileNotFoundException ex) {
4194 LOG.warn(getRegionInfo().getEncodedName() + " : "
4195 + "At least one of the store files in compaction: "
4196 + TextFormat.shortDebugString(compaction)
4197 + " doesn't exist any more. Skip loading the file(s)", ex);
4198 } finally {
4199 closeRegionOperation(Operation.REPLAY_EVENT);
4200 }
4201 }
4202 }
4203
4204 void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
4205 checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
4206 "Flush marker from WAL ", flush);
4207
4208 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4209 return;
4210 }
4211
4212 if (LOG.isDebugEnabled()) {
4213 LOG.debug(getRegionInfo().getEncodedName() + " : "
4214 + "Replaying flush marker " + TextFormat.shortDebugString(flush));
4215 }
4216
4217 startRegionOperation(Operation.REPLAY_EVENT);
4218 try {
4219 FlushAction action = flush.getAction();
4220 switch (action) {
4221 case START_FLUSH:
4222 replayWALFlushStartMarker(flush);
4223 break;
4224 case COMMIT_FLUSH:
4225 replayWALFlushCommitMarker(flush);
4226 break;
4227 case ABORT_FLUSH:
4228 replayWALFlushAbortMarker(flush);
4229 break;
4230 case CANNOT_FLUSH:
4231 replayWALFlushCannotFlushMarker(flush, replaySeqId);
4232 break;
4233 default:
4234 LOG.warn(getRegionInfo().getEncodedName() + " : " +
4235 "Received a flush event with unknown action, ignoring. " +
4236 TextFormat.shortDebugString(flush));
4237 break;
4238 }
4239
4240 logRegionFiles();
4241 } finally {
4242 closeRegionOperation(Operation.REPLAY_EVENT);
4243 }
4244 }
4245
4246
4247
4248
4249
4250 @VisibleForTesting
4251 PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
4252 long flushSeqId = flush.getFlushSequenceNumber();
4253
4254 HashSet<Store> storesToFlush = new HashSet<Store>();
4255 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4256 byte[] family = storeFlush.getFamilyName().toByteArray();
4257 Store store = getStore(family);
4258 if (store == null) {
4259 LOG.warn(getRegionInfo().getEncodedName() + " : "
4260 + "Received a flush start marker from primary, but the family is not found. Ignoring"
4261 + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
4262 continue;
4263 }
4264 storesToFlush.add(store);
4265 }
4266
4267 MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
4268
4269
4270
4271 synchronized (writestate) {
4272 try {
4273 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4274 LOG.warn(getRegionInfo().getEncodedName() + " : "
4275 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4276 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4277 + " of " + lastReplayedOpenRegionSeqId);
4278 return null;
4279 }
4280 if (numMutationsWithoutWAL.get() > 0) {
4281 numMutationsWithoutWAL.set(0);
4282 dataInMemoryWithoutWAL.set(0);
4283 }
4284
4285 if (!writestate.flushing) {
4286
4287
4288
4289
4290 PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
4291 flushSeqId, storesToFlush, status, false);
4292 if (prepareResult.result == null) {
4293
4294 this.writestate.flushing = true;
4295 this.prepareFlushResult = prepareResult;
4296 status.markComplete("Flush prepare successful");
4297 if (LOG.isDebugEnabled()) {
4298 LOG.debug(getRegionInfo().getEncodedName() + " : "
4299 + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
4300 }
4301 } else {
4302
4303
4304 if (prepareResult.getResult().getResult() ==
4305 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4306 this.writestate.flushing = true;
4307 this.prepareFlushResult = prepareResult;
4308 if (LOG.isDebugEnabled()) {
4309 LOG.debug(getRegionInfo().getEncodedName() + " : "
4310 + " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber());
4311 }
4312 }
4313 status.abort("Flush prepare failed with " + prepareResult.result);
4314
4315 }
4316 return prepareResult;
4317 } else {
4318
4319 if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
4320
4321 LOG.warn(getRegionInfo().getEncodedName() + " : "
4322 + "Received a flush prepare marker with the same seqId: " +
4323 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4324 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4325
4326 } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
4327
4328
4329 LOG.warn(getRegionInfo().getEncodedName() + " : "
4330 + "Received a flush prepare marker with a smaller seqId: " +
4331 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4332 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4333
4334 } else {
4335
4336 LOG.warn(getRegionInfo().getEncodedName() + " : "
4337 + "Received a flush prepare marker with a larger seqId: " +
4338 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4339 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4340
4341
4342
4343
4344
4345
4346
4347
4348
4349
4350
4351 }
4352 }
4353 } finally {
4354 status.cleanup();
4355 writestate.notifyAll();
4356 }
4357 }
4358 return null;
4359 }
4360
4361 @VisibleForTesting
4362 void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
4363 MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
4364
4365
4366
4367
4368
4369 synchronized (writestate) {
4370 try {
4371 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4372 LOG.warn(getRegionInfo().getEncodedName() + " : "
4373 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4374 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4375 + " of " + lastReplayedOpenRegionSeqId);
4376 return;
4377 }
4378
4379 if (writestate.flushing) {
4380 PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
4381 if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
4382 if (LOG.isDebugEnabled()) {
4383 LOG.debug(getRegionInfo().getEncodedName() + " : "
4384 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4385 + " and a previous prepared snapshot was found");
4386 }
4387
4388
4389 replayFlushInStores(flush, prepareFlushResult, true);
4390
4391
4392 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4393
4394 this.prepareFlushResult = null;
4395 writestate.flushing = false;
4396 } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
4397
4398
4399
4400
4401 LOG.warn(getRegionInfo().getEncodedName() + " : "
4402 + "Received a flush commit marker with smaller seqId: "
4403 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
4404 + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
4405 +" prepared memstore snapshot");
4406 replayFlushInStores(flush, prepareFlushResult, false);
4407
4408
4409
4410 } else {
4411
4412
4413
4414
4415
4416 LOG.warn(getRegionInfo().getEncodedName() + " : "
4417 + "Received a flush commit marker with larger seqId: "
4418 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
4419 prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
4420 +" memstore snapshot");
4421
4422 replayFlushInStores(flush, prepareFlushResult, true);
4423
4424
4425 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4426
4427
4428
4429 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4430
4431 this.prepareFlushResult = null;
4432 writestate.flushing = false;
4433 }
4434
4435
4436
4437
4438
4439 this.setReadsEnabled(true);
4440 } else {
4441 LOG.warn(getRegionInfo().getEncodedName() + " : "
4442 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4443 + ", but no previous prepared snapshot was found");
4444
4445
4446 replayFlushInStores(flush, null, false);
4447
4448
4449
4450 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4451 }
4452
4453 status.markComplete("Flush commit successful");
4454
4455
4456 this.maxFlushedSeqId = flush.getFlushSequenceNumber();
4457
4458
4459
4460
4461
4462
4463
4464 getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
4465
4466 } catch (FileNotFoundException ex) {
4467 LOG.warn(getRegionInfo().getEncodedName() + " : "
4468 + "At least one of the store files in flush: " + TextFormat.shortDebugString(flush)
4469 + " doesn't exist any more. Skip loading the file(s)", ex);
4470 }
4471 finally {
4472 status.cleanup();
4473 writestate.notifyAll();
4474 }
4475 }
4476
4477
4478
4479 synchronized (this) {
4480 notifyAll();
4481 }
4482 }
4483
4484
4485
4486
4487
4488
4489
4490
4491
4492 private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
4493 boolean dropMemstoreSnapshot)
4494 throws IOException {
4495 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4496 byte[] family = storeFlush.getFamilyName().toByteArray();
4497 Store store = getStore(family);
4498 if (store == null) {
4499 LOG.warn(getRegionInfo().getEncodedName() + " : "
4500 + "Received a flush commit marker from primary, but the family is not found."
4501 + "Ignoring StoreFlushDescriptor:" + storeFlush);
4502 continue;
4503 }
4504 List<String> flushFiles = storeFlush.getFlushOutputList();
4505 StoreFlushContext ctx = null;
4506 long startTime = EnvironmentEdgeManager.currentTime();
4507 if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
4508 ctx = store.createFlushContext(flush.getFlushSequenceNumber());
4509 } else {
4510 ctx = prepareFlushResult.storeFlushCtxs.get(family);
4511 startTime = prepareFlushResult.startTime;
4512 }
4513
4514 if (ctx == null) {
4515 LOG.warn(getRegionInfo().getEncodedName() + " : "
4516 + "Unexpected: flush commit marker received from store "
4517 + Bytes.toString(family) + " but no associated flush context. Ignoring");
4518 continue;
4519 }
4520
4521 ctx.replayFlush(flushFiles, dropMemstoreSnapshot);
4522
4523
4524 this.lastStoreFlushTimeMap.put(store, startTime);
4525 }
4526 }
4527
4528
4529
4530
4531
4532
4533
4534 private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
4535 long totalFreedSize = 0;
4536 this.updatesLock.writeLock().lock();
4537 try {
4538 mvcc.waitForPreviousTransactionsComplete();
4539 long currentSeqId = getSequenceId().get();
4540 if (seqId >= currentSeqId) {
4541
4542 LOG.info(getRegionInfo().getEncodedName() + " : "
4543 + "Dropping memstore contents as well since replayed flush seqId: "
4544 + seqId + " is greater than current seqId:" + currentSeqId);
4545
4546
4547 if (store == null ) {
4548 for (Store s : stores.values()) {
4549 totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
4550 }
4551 } else {
4552 totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
4553 }
4554 } else {
4555 LOG.info(getRegionInfo().getEncodedName() + " : "
4556 + "Not dropping memstore contents since replayed flush seqId: "
4557 + seqId + " is smaller than current seqId:" + currentSeqId);
4558 }
4559 } finally {
4560 this.updatesLock.writeLock().unlock();
4561 }
4562 return totalFreedSize;
4563 }
4564
4565 private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
4566 long snapshotSize = s.getFlushableSize();
4567 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4568 StoreFlushContext ctx = s.createFlushContext(currentSeqId);
4569 ctx.prepare();
4570 ctx.abort();
4571 return snapshotSize;
4572 }
4573
4574 private void replayWALFlushAbortMarker(FlushDescriptor flush) {
4575
4576
4577
4578 }
4579
4580 private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) {
4581 synchronized (writestate) {
4582 if (this.lastReplayedOpenRegionSeqId > replaySeqId) {
4583 LOG.warn(getRegionInfo().getEncodedName() + " : "
4584 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4585 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4586 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4587 return;
4588 }
4589
4590
4591
4592
4593
4594
4595 this.setReadsEnabled(true);
4596 }
4597 }
4598
4599 @VisibleForTesting
4600 PrepareFlushResult getPrepareFlushResult() {
4601 return prepareFlushResult;
4602 }
4603
4604 void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
4605 checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
4606 "RegionEvent marker from WAL ", regionEvent);
4607
4608 startRegionOperation(Operation.REPLAY_EVENT);
4609 try {
4610 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4611 return;
4612 }
4613
4614 if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
4615
4616 return;
4617 }
4618 if (regionEvent.getEventType() != EventType.REGION_OPEN) {
4619 LOG.warn(getRegionInfo().getEncodedName() + " : "
4620 + "Unknown region event received, ignoring :"
4621 + TextFormat.shortDebugString(regionEvent));
4622 return;
4623 }
4624
4625 if (LOG.isDebugEnabled()) {
4626 LOG.debug(getRegionInfo().getEncodedName() + " : "
4627 + "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
4628 }
4629
4630
4631 synchronized (writestate) {
4632
4633
4634
4635
4636
4637
4638 if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) {
4639 this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
4640 } else {
4641 LOG.warn(getRegionInfo().getEncodedName() + " : "
4642 + "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
4643 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4644 + " of " + lastReplayedOpenRegionSeqId);
4645 return;
4646 }
4647
4648
4649
4650 for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
4651
4652 byte[] family = storeDescriptor.getFamilyName().toByteArray();
4653 Store store = getStore(family);
4654 if (store == null) {
4655 LOG.warn(getRegionInfo().getEncodedName() + " : "
4656 + "Received a region open marker from primary, but the family is not found. "
4657 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4658 continue;
4659 }
4660
4661 long storeSeqId = store.getMaxSequenceId();
4662 List<String> storeFiles = storeDescriptor.getStoreFileList();
4663 try {
4664 store.refreshStoreFiles(storeFiles);
4665 } catch (FileNotFoundException ex) {
4666 LOG.warn(getRegionInfo().getEncodedName() + " : "
4667 + "At least one of the store files: " + storeFiles
4668 + " doesn't exist any more. Skip loading the file(s)", ex);
4669 continue;
4670 }
4671 if (store.getMaxSequenceId() != storeSeqId) {
4672
4673 lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
4674 }
4675
4676 if (writestate.flushing) {
4677
4678 if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
4679 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4680 null : this.prepareFlushResult.storeFlushCtxs.get(family);
4681 if (ctx != null) {
4682 long snapshotSize = store.getFlushableSize();
4683 ctx.abort();
4684 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4685 this.prepareFlushResult.storeFlushCtxs.remove(family);
4686 }
4687 }
4688 }
4689
4690
4691 dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
4692 if (storeSeqId > this.maxFlushedSeqId) {
4693 this.maxFlushedSeqId = storeSeqId;
4694 }
4695 }
4696
4697
4698
4699 dropPrepareFlushIfPossible();
4700
4701
4702
4703
4704 getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
4705
4706
4707
4708 this.setReadsEnabled(true);
4709
4710
4711
4712 synchronized (this) {
4713 notifyAll();
4714 }
4715 }
4716 logRegionFiles();
4717 } finally {
4718 closeRegionOperation(Operation.REPLAY_EVENT);
4719 }
4720 }
4721
4722 void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
4723 checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
4724 "BulkLoad marker from WAL ", bulkLoadEvent);
4725
4726 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4727 return;
4728 }
4729
4730 if (LOG.isDebugEnabled()) {
4731 LOG.debug(getRegionInfo().getEncodedName() + " : "
4732 + "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent));
4733 }
4734
4735 boolean multipleFamilies = false;
4736 byte[] family = null;
4737 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4738 byte[] fam = storeDescriptor.getFamilyName().toByteArray();
4739 if (family == null) {
4740 family = fam;
4741 } else if (!Bytes.equals(family, fam)) {
4742 multipleFamilies = true;
4743 break;
4744 }
4745 }
4746
4747 startBulkRegionOperation(multipleFamilies);
4748 try {
4749
4750 synchronized (writestate) {
4751
4752
4753
4754
4755
4756
4757 if (bulkLoadEvent.getBulkloadSeqNum() >= 0
4758 && this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum()) {
4759 LOG.warn(getRegionInfo().getEncodedName() + " : "
4760 + "Skipping replaying bulkload event :"
4761 + TextFormat.shortDebugString(bulkLoadEvent)
4762 + " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId"
4763 + " =" + lastReplayedOpenRegionSeqId);
4764
4765 return;
4766 }
4767
4768 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4769
4770 family = storeDescriptor.getFamilyName().toByteArray();
4771 Store store = getStore(family);
4772 if (store == null) {
4773 LOG.warn(getRegionInfo().getEncodedName() + " : "
4774 + "Received a bulk load marker from primary, but the family is not found. "
4775 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4776 continue;
4777 }
4778
4779 List<String> storeFiles = storeDescriptor.getStoreFileList();
4780 for (String storeFile : storeFiles) {
4781 StoreFileInfo storeFileInfo = null;
4782 try {
4783 storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
4784 store.bulkLoadHFile(storeFileInfo);
4785 } catch(FileNotFoundException ex) {
4786 LOG.warn(getRegionInfo().getEncodedName() + " : "
4787 + ((storeFileInfo != null) ? storeFileInfo.toString() :
4788 (new Path(Bytes.toString(family), storeFile)).toString())
4789 + " doesn't exist any more. Skip loading the file");
4790 }
4791 }
4792 }
4793 }
4794 if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
4795 getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum());
4796 }
4797 } finally {
4798 closeBulkRegionOperation();
4799 }
4800 }
4801
4802
4803
4804
4805 private void dropPrepareFlushIfPossible() {
4806 if (writestate.flushing) {
4807 boolean canDrop = true;
4808 if (prepareFlushResult.storeFlushCtxs != null) {
4809 for (Entry<byte[], StoreFlushContext> entry
4810 : prepareFlushResult.storeFlushCtxs.entrySet()) {
4811 Store store = getStore(entry.getKey());
4812 if (store == null) {
4813 continue;
4814 }
4815 if (store.getSnapshotSize() > 0) {
4816 canDrop = false;
4817 break;
4818 }
4819 }
4820 }
4821
4822
4823
4824 if (canDrop) {
4825 writestate.flushing = false;
4826 this.prepareFlushResult = null;
4827 }
4828 }
4829 }
4830
4831 @Override
4832 public boolean refreshStoreFiles() throws IOException {
4833 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4834 return false;
4835 }
4836
4837 if (LOG.isDebugEnabled()) {
4838 LOG.debug(getRegionInfo().getEncodedName() + " : "
4839 + "Refreshing store files to see whether we can free up memstore");
4840 }
4841
4842 long totalFreedSize = 0;
4843
4844 long smallestSeqIdInStores = Long.MAX_VALUE;
4845
4846 startRegionOperation();
4847 try {
4848 synchronized (writestate) {
4849 for (Store store : getStores()) {
4850
4851
4852 long maxSeqIdBefore = store.getMaxSequenceId();
4853
4854
4855 store.refreshStoreFiles();
4856
4857 long storeSeqId = store.getMaxSequenceId();
4858 if (storeSeqId < smallestSeqIdInStores) {
4859 smallestSeqIdInStores = storeSeqId;
4860 }
4861
4862
4863 if (storeSeqId > maxSeqIdBefore) {
4864
4865 if (writestate.flushing) {
4866
4867 if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
4868 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4869 null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
4870 if (ctx != null) {
4871 long snapshotSize = store.getFlushableSize();
4872 ctx.abort();
4873 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4874 this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
4875 totalFreedSize += snapshotSize;
4876 }
4877 }
4878 }
4879
4880
4881 totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store);
4882 }
4883 }
4884
4885
4886
4887 dropPrepareFlushIfPossible();
4888
4889
4890
4891
4892 for (Store s : getStores()) {
4893 getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS());
4894 }
4895
4896
4897
4898
4899
4900 if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) {
4901 this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores;
4902 }
4903 }
4904
4905
4906 synchronized (this) {
4907 notifyAll();
4908 }
4909 return totalFreedSize > 0;
4910 } finally {
4911 closeRegionOperation();
4912 }
4913 }
4914
4915 private void logRegionFiles() {
4916 if (LOG.isTraceEnabled()) {
4917 LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
4918 for (Store s : stores.values()) {
4919 Collection<StoreFile> storeFiles = s.getStorefiles();
4920 if (storeFiles == null) continue;
4921 for (StoreFile sf : storeFiles) {
4922 LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
4923 }
4924 }
4925 }
4926 }
4927
4928
4929
4930
4931 private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
4932 throws WrongRegionException {
4933 if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
4934 return;
4935 }
4936
4937 if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
4938 Bytes.equals(encodedRegionName,
4939 this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
4940 return;
4941 }
4942
4943 throw new WrongRegionException(exceptionMsg + payload
4944 + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
4945 + " does not match this region: " + this.getRegionInfo());
4946 }
4947
4948
4949
4950
4951
4952
4953
4954 protected boolean restoreEdit(final Store s, final Cell cell) {
4955 long kvSize = s.add(cell).getFirst();
4956 if (this.rsAccounting != null) {
4957 rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
4958 }
4959 return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
4960 }
4961
4962
4963
4964
4965
4966
4967
4968 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
4969 throws IOException {
4970 FileStatus stat = fs.getFileStatus(p);
4971 if (stat.getLen() > 0) return false;
4972 LOG.warn("File " + p + " is zero-length, deleting.");
4973 fs.delete(p, false);
4974 return true;
4975 }
4976
4977 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
4978 return new HStore(this, family, this.conf);
4979 }
4980
4981 @Override
4982 public Store getStore(final byte[] column) {
4983 return this.stores.get(column);
4984 }
4985
4986 public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
4987 return lockedRows;
4988 }
4989
4990
4991
4992
4993
4994 private Store getStore(Cell cell) {
4995 for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
4996 if (Bytes.equals(
4997 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
4998 famStore.getKey(), 0, famStore.getKey().length)) {
4999 return famStore.getValue();
5000 }
5001 }
5002
5003 return null;
5004 }
5005
5006 @Override
5007 public List<Store> getStores() {
5008 List<Store> list = new ArrayList<Store>(stores.size());
5009 list.addAll(stores.values());
5010 return list;
5011 }
5012
5013 @Override
5014 public List<String> getStoreFileList(final byte [][] columns)
5015 throws IllegalArgumentException {
5016 List<String> storeFileNames = new ArrayList<String>();
5017 synchronized(closeLock) {
5018 for(byte[] column : columns) {
5019 Store store = this.stores.get(column);
5020 if (store == null) {
5021 throw new IllegalArgumentException("No column family : " +
5022 new String(column) + " available");
5023 }
5024 Collection<StoreFile> storeFiles = store.getStorefiles();
5025 if (storeFiles == null) continue;
5026 for (StoreFile storeFile: storeFiles) {
5027 storeFileNames.add(storeFile.getPath().toString());
5028 }
5029
5030 logRegionFiles();
5031 }
5032 }
5033 return storeFileNames;
5034 }
5035
5036
5037
5038
5039
5040
5041 void checkRow(final byte [] row, String op) throws IOException {
5042 if (!rowIsInRange(getRegionInfo(), row)) {
5043 throw new WrongRegionException("Requested row out of range for " +
5044 op + " on HRegion " + this + ", startKey='" +
5045 Bytes.toStringBinary(getRegionInfo().getStartKey()) + "', getEndKey()='" +
5046 Bytes.toStringBinary(getRegionInfo().getEndKey()) + "', row='" +
5047 Bytes.toStringBinary(row) + "'");
5048 }
5049 }
5050
5051 @Override
5052 public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
5053 startRegionOperation();
5054 try {
5055 return getRowLockInternal(row, waitForLock);
5056 } finally {
5057 closeRegionOperation();
5058 }
5059 }
5060
5061
5062
5063
5064
5065 protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
5066 HashedBytes rowKey = new HashedBytes(row);
5067 RowLockContext rowLockContext = new RowLockContext(rowKey);
5068
5069
5070 while (true) {
5071 RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
5072 if (existingContext == null) {
5073
5074 break;
5075 } else if (existingContext.ownedByCurrentThread()) {
5076
5077 rowLockContext = existingContext;
5078 break;
5079 } else {
5080 if (!waitForLock) {
5081 return null;
5082 }
5083 TraceScope traceScope = null;
5084 try {
5085 if (Trace.isTracing()) {
5086 traceScope = Trace.startSpan("HRegion.getRowLockInternal");
5087 }
5088
5089 if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
5090 if(traceScope != null) {
5091 traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
5092 }
5093 throw new IOException("Timed out waiting for lock for row: " + rowKey);
5094 }
5095 if (traceScope != null) traceScope.close();
5096 traceScope = null;
5097 } catch (InterruptedException ie) {
5098 LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
5099 InterruptedIOException iie = new InterruptedIOException();
5100 iie.initCause(ie);
5101 throw iie;
5102 } finally {
5103 if (traceScope != null) traceScope.close();
5104 }
5105 }
5106 }
5107
5108
5109 return rowLockContext.newLock();
5110 }
5111
5112
5113
5114
5115
5116
5117
5118 public RowLock getRowLock(byte[] row) throws IOException {
5119 return getRowLock(row, true);
5120 }
5121
5122 @Override
5123 public void releaseRowLocks(List<RowLock> rowLocks) {
5124 if (rowLocks != null) {
5125 for (RowLock rowLock : rowLocks) {
5126 rowLock.release();
5127 }
5128 rowLocks.clear();
5129 }
5130 }
5131
5132
5133
5134
5135
5136
5137
5138 private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {
5139 boolean multipleFamilies = false;
5140 byte[] family = null;
5141 for (Pair<byte[], String> pair : familyPaths) {
5142 byte[] fam = pair.getFirst();
5143 if (family == null) {
5144 family = fam;
5145 } else if (!Bytes.equals(family, fam)) {
5146 multipleFamilies = true;
5147 break;
5148 }
5149 }
5150 return multipleFamilies;
5151 }
5152
5153 @Override
5154 public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
5155 BulkLoadListener bulkLoadListener) throws IOException {
5156 long seqId = -1;
5157 Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
5158 Preconditions.checkNotNull(familyPaths);
5159
5160 startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
5161 try {
5162 this.writeRequestsCount.increment();
5163
5164
5165
5166
5167 List<IOException> ioes = new ArrayList<IOException>();
5168 List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
5169 for (Pair<byte[], String> p : familyPaths) {
5170 byte[] familyName = p.getFirst();
5171 String path = p.getSecond();
5172
5173 Store store = getStore(familyName);
5174 if (store == null) {
5175 IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
5176 "No such column family " + Bytes.toStringBinary(familyName));
5177 ioes.add(ioe);
5178 } else {
5179 try {
5180 store.assertBulkLoadHFileOk(new Path(path));
5181 } catch (WrongRegionException wre) {
5182
5183 failures.add(p);
5184 } catch (IOException ioe) {
5185
5186 ioes.add(ioe);
5187 }
5188 }
5189 }
5190
5191
5192 if (ioes.size() != 0) {
5193 IOException e = MultipleIOException.createIOException(ioes);
5194 LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
5195 throw e;
5196 }
5197
5198
5199 if (failures.size() != 0) {
5200 StringBuilder list = new StringBuilder();
5201 for (Pair<byte[], String> p : failures) {
5202 list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
5203 .append(p.getSecond());
5204 }
5205
5206 LOG.warn("There was a recoverable bulk load failure likely due to a" +
5207 " split. These (family, HFile) pairs were not loaded: " + list);
5208 return false;
5209 }
5210
5211
5212
5213
5214
5215
5216 if (assignSeqId) {
5217 FlushResult fs = flushcache(true, false);
5218 if (fs.isFlushSucceeded()) {
5219 seqId = ((FlushResultImpl)fs).flushSequenceId;
5220 } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
5221 seqId = ((FlushResultImpl)fs).flushSequenceId;
5222 } else {
5223 throw new IOException("Could not bulk load with an assigned sequential ID because the "+
5224 "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);
5225 }
5226 }
5227
5228 for (Pair<byte[], String> p : familyPaths) {
5229 byte[] familyName = p.getFirst();
5230 String path = p.getSecond();
5231 Store store = getStore(familyName);
5232 try {
5233 String finalPath = path;
5234 if (bulkLoadListener != null) {
5235 finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
5236 }
5237 Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
5238
5239 if(storeFiles.containsKey(familyName)) {
5240 storeFiles.get(familyName).add(commitedStoreFile);
5241 } else {
5242 List<Path> storeFileNames = new ArrayList<Path>();
5243 storeFileNames.add(commitedStoreFile);
5244 storeFiles.put(familyName, storeFileNames);
5245 }
5246 if (bulkLoadListener != null) {
5247 bulkLoadListener.doneBulkLoad(familyName, path);
5248 }
5249 } catch (IOException ioe) {
5250
5251
5252
5253
5254 LOG.error("There was a partial failure due to IO when attempting to" +
5255 " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
5256 if (bulkLoadListener != null) {
5257 try {
5258 bulkLoadListener.failedBulkLoad(familyName, path);
5259 } catch (Exception ex) {
5260 LOG.error("Error while calling failedBulkLoad for family " +
5261 Bytes.toString(familyName) + " with path " + path, ex);
5262 }
5263 }
5264 throw ioe;
5265 }
5266 }
5267
5268 return true;
5269 } finally {
5270 if (wal != null && !storeFiles.isEmpty()) {
5271
5272 try {
5273 WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
5274 this.getRegionInfo().getTable(),
5275 ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
5276 WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
5277 loadDescriptor, sequenceId);
5278 } catch (IOException ioe) {
5279 if (this.rsServices != null) {
5280
5281
5282 this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);
5283 }
5284 }
5285 }
5286
5287 closeBulkRegionOperation();
5288 }
5289 }
5290
5291 @Override
5292 public boolean equals(Object o) {
5293 return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),
5294 ((HRegion) o).getRegionInfo().getRegionName());
5295 }
5296
5297 @Override
5298 public int hashCode() {
5299 return Bytes.hashCode(getRegionInfo().getRegionName());
5300 }
5301
5302 @Override
5303 public String toString() {
5304 return getRegionInfo().getRegionNameAsString();
5305 }
5306
5307
5308
5309
5310 class RegionScannerImpl implements RegionScanner {
5311
5312 KeyValueHeap storeHeap = null;
5313
5314
5315 KeyValueHeap joinedHeap = null;
5316
5317
5318
5319 protected Cell joinedContinuationRow = null;
5320 protected final byte[] stopRow;
5321 private final FilterWrapper filter;
5322 private ScannerContext defaultScannerContext;
5323 protected int isScan;
5324 private boolean filterClosed = false;
5325 private long readPt;
5326 private long maxResultSize;
5327 protected HRegion region;
5328
5329 @Override
5330 public HRegionInfo getRegionInfo() {
5331 return region.getRegionInfo();
5332 }
5333
5334 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
5335 throws IOException {
5336
5337 this.region = region;
5338 this.maxResultSize = scan.getMaxResultSize();
5339 if (scan.hasFilter()) {
5340 this.filter = new FilterWrapper(scan.getFilter());
5341 } else {
5342 this.filter = null;
5343 }
5344
5345
5346
5347
5348
5349
5350 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
5351
5352 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
5353 this.stopRow = null;
5354 } else {
5355 this.stopRow = scan.getStopRow();
5356 }
5357
5358
5359 this.isScan = scan.isGetScan() ? -1 : 0;
5360
5361
5362
5363 IsolationLevel isolationLevel = scan.getIsolationLevel();
5364 synchronized(scannerReadPoints) {
5365 this.readPt = getReadpoint(isolationLevel);
5366 scannerReadPoints.put(this, this.readPt);
5367 }
5368
5369
5370
5371 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
5372 List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
5373
5374 List<KeyValueScanner> instantiatedScanners = new ArrayList<KeyValueScanner>();
5375
5376 if (additionalScanners != null && !additionalScanners.isEmpty()) {
5377 scanners.addAll(additionalScanners);
5378 instantiatedScanners.addAll(additionalScanners);
5379 }
5380
5381 try {
5382 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
5383 Store store = stores.get(entry.getKey());
5384 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
5385 instantiatedScanners.add(scanner);
5386 if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
5387 || this.filter.isFamilyEssential(entry.getKey())) {
5388 scanners.add(scanner);
5389 } else {
5390 joinedScanners.add(scanner);
5391 }
5392 }
5393 initializeKVHeap(scanners, joinedScanners, region);
5394 } catch (Throwable t) {
5395 throw handleException(instantiatedScanners, t);
5396 }
5397 }
5398
5399 protected void initializeKVHeap(List<KeyValueScanner> scanners,
5400 List<KeyValueScanner> joinedScanners, HRegion region)
5401 throws IOException {
5402 this.storeHeap = new KeyValueHeap(scanners, region.comparator);
5403 if (!joinedScanners.isEmpty()) {
5404 this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
5405 }
5406 }
5407
5408 private IOException handleException(List<KeyValueScanner> instantiatedScanners,
5409 Throwable t) {
5410
5411 scannerReadPoints.remove(this);
5412 if (storeHeap != null) {
5413 storeHeap.close();
5414 storeHeap = null;
5415 if (joinedHeap != null) {
5416 joinedHeap.close();
5417 joinedHeap = null;
5418 }
5419 } else {
5420
5421 for (KeyValueScanner scanner : instantiatedScanners) {
5422 scanner.close();
5423 }
5424 }
5425 return t instanceof IOException ? (IOException) t : new IOException(t);
5426 }
5427
5428 @Override
5429 public long getMaxResultSize() {
5430 return maxResultSize;
5431 }
5432
5433 @Override
5434 public long getMvccReadPoint() {
5435 return this.readPt;
5436 }
5437
5438 @Override
5439 public int getBatch() {
5440 return this.defaultScannerContext.getBatchLimit();
5441 }
5442
5443
5444
5445
5446
5447
5448 protected void resetFilters() throws IOException {
5449 if (filter != null) {
5450 filter.reset();
5451 }
5452 }
5453
5454 @Override
5455 public boolean next(List<Cell> outResults)
5456 throws IOException {
5457
5458 return next(outResults, defaultScannerContext);
5459 }
5460
5461 @Override
5462 public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
5463 if (this.filterClosed) {
5464 throw new UnknownScannerException("Scanner was closed (timed out?) " +
5465 "after we renewed it. Could be caused by a very slow scanner " +
5466 "or a lengthy garbage collection");
5467 }
5468 startRegionOperation(Operation.SCAN);
5469 readRequestsCount.increment();
5470 try {
5471 return nextRaw(outResults, scannerContext);
5472 } finally {
5473 closeRegionOperation(Operation.SCAN);
5474 }
5475 }
5476
5477 @Override
5478 public boolean nextRaw(List<Cell> outResults) throws IOException {
5479
5480 return nextRaw(outResults, defaultScannerContext);
5481 }
5482
5483 @Override
5484 public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
5485 throws IOException {
5486 if (storeHeap == null) {
5487
5488 throw new UnknownScannerException("Scanner was closed");
5489 }
5490 boolean moreValues;
5491 if (outResults.isEmpty()) {
5492
5493
5494 moreValues = nextInternal(outResults, scannerContext);
5495 } else {
5496 List<Cell> tmpList = new ArrayList<Cell>();
5497 moreValues = nextInternal(tmpList, scannerContext);
5498 outResults.addAll(tmpList);
5499 }
5500
5501
5502
5503
5504 if (!scannerContext.midRowResultFormed()) resetFilters();
5505
5506 if (isFilterDoneInternal()) {
5507 moreValues = false;
5508 }
5509 return moreValues;
5510 }
5511
5512
5513
5514
5515 private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
5516 throws IOException {
5517 assert joinedContinuationRow != null;
5518 boolean moreValues =
5519 populateResult(results, this.joinedHeap, scannerContext,
5520 joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
5521 joinedContinuationRow.getRowLength());
5522
5523 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5524
5525 joinedContinuationRow = null;
5526 }
5527
5528
5529 Collections.sort(results, comparator);
5530 return moreValues;
5531 }
5532
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543 private boolean populateResult(List<Cell> results, KeyValueHeap heap,
5544 ScannerContext scannerContext, byte[] currentRow, int offset, short length)
5545 throws IOException {
5546 Cell nextKv;
5547 boolean moreCellsInRow = false;
5548 boolean tmpKeepProgress = scannerContext.getKeepProgress();
5549
5550 LimitScope limitScope = LimitScope.BETWEEN_CELLS;
5551 do {
5552
5553
5554
5555 scannerContext.setKeepProgress(true);
5556 heap.next(results, scannerContext);
5557 scannerContext.setKeepProgress(tmpKeepProgress);
5558
5559 nextKv = heap.peek();
5560 moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
5561
5562 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
5563 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
5564 } else if (scannerContext.checkSizeLimit(limitScope)) {
5565 ScannerContext.NextState state =
5566 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
5567 return scannerContext.setScannerState(state).hasMoreValues();
5568 } else if (scannerContext.checkTimeLimit(limitScope)) {
5569 ScannerContext.NextState state =
5570 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
5571 return scannerContext.setScannerState(state).hasMoreValues();
5572 }
5573 } while (moreCellsInRow);
5574
5575 return nextKv != null;
5576 }
5577
5578
5579
5580
5581
5582
5583
5584
5585
5586
5587
5588 private boolean moreCellsInRow(final Cell nextKv, byte[] currentRow, int offset,
5589 short length) {
5590 return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length);
5591 }
5592
5593
5594
5595
5596 @Override
5597 public synchronized boolean isFilterDone() throws IOException {
5598 return isFilterDoneInternal();
5599 }
5600
5601 private boolean isFilterDoneInternal() throws IOException {
5602 return this.filter != null && this.filter.filterAllRemaining();
5603 }
5604
5605 private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
5606 throws IOException {
5607 if (!results.isEmpty()) {
5608 throw new IllegalArgumentException("First parameter should be an empty list");
5609 }
5610 if (scannerContext == null) {
5611 throw new IllegalArgumentException("Scanner context cannot be null");
5612 }
5613 RpcCallContext rpcCall = RpcServer.getCurrentCall();
5614
5615
5616
5617
5618 int initialBatchProgress = scannerContext.getBatchProgress();
5619 long initialSizeProgress = scannerContext.getSizeProgress();
5620 long initialTimeProgress = scannerContext.getTimeProgress();
5621
5622
5623
5624
5625
5626
5627 while (true) {
5628
5629
5630 if (scannerContext.getKeepProgress()) {
5631
5632 scannerContext
5633 .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress);
5634 } else {
5635 scannerContext.clearProgress();
5636 }
5637
5638 if (rpcCall != null) {
5639
5640
5641
5642
5643 long afterTime = rpcCall.disconnectSince();
5644 if (afterTime >= 0) {
5645 throw new CallerDisconnectedException(
5646 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " +
5647 this + " after " + afterTime + " ms, since " +
5648 "caller disconnected");
5649 }
5650 }
5651
5652
5653 Cell current = this.storeHeap.peek();
5654
5655 byte[] currentRow = null;
5656 int offset = 0;
5657 short length = 0;
5658 if (current != null) {
5659 currentRow = current.getRowArray();
5660 offset = current.getRowOffset();
5661 length = current.getRowLength();
5662 }
5663
5664 boolean stopRow = isStopRow(currentRow, offset, length);
5665
5666
5667
5668
5669 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
5670
5671
5672
5673
5674
5675 if (hasFilterRow) {
5676 if (LOG.isTraceEnabled()) {
5677 LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
5678 + " formed. Changing scope of limits that may create partials");
5679 }
5680 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
5681 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
5682 }
5683
5684
5685
5686 if (joinedContinuationRow == null) {
5687
5688 if (stopRow) {
5689 if (hasFilterRow) {
5690 filter.filterRowCells(results);
5691 }
5692 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5693 }
5694
5695
5696
5697 if (filterRowKey(currentRow, offset, length)) {
5698 boolean moreRows = !isFilterDoneInternal() && nextRow(currentRow, offset, length);
5699 if (!moreRows) {
5700 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5701 }
5702 results.clear();
5703 continue;
5704 }
5705
5706
5707 populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
5708
5709 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5710 if (hasFilterRow) {
5711 throw new IncompatibleFilterException(
5712 "Filter whose hasFilterRow() returns true is incompatible with scans that must "
5713 + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
5714 }
5715 return true;
5716 }
5717
5718 Cell nextKv = this.storeHeap.peek();
5719 stopRow = nextKv == null ||
5720 isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
5721
5722 final boolean isEmptyRow = results.isEmpty();
5723
5724
5725
5726 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
5727 if (hasFilterRow) {
5728 ret = filter.filterRowCellsWithRet(results);
5729
5730
5731
5732
5733 long timeProgress = scannerContext.getTimeProgress();
5734 if (scannerContext.getKeepProgress()) {
5735 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
5736 initialTimeProgress);
5737 } else {
5738 scannerContext.clearProgress();
5739 }
5740 scannerContext.setTimeProgress(timeProgress);
5741 scannerContext.incrementBatchProgress(results.size());
5742 for (Cell cell : results) {
5743 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
5744 }
5745 }
5746
5747 if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
5748 results.clear();
5749 boolean moreRows = nextRow(currentRow, offset, length);
5750 if (!moreRows) {
5751 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5752 }
5753
5754
5755
5756 if (!stopRow) continue;
5757 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5758 }
5759
5760
5761
5762
5763
5764 if (this.joinedHeap != null) {
5765 boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
5766 if (mayHaveData) {
5767 joinedContinuationRow = current;
5768 populateFromJoinedHeap(results, scannerContext);
5769
5770 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5771 return true;
5772 }
5773 }
5774 }
5775 } else {
5776
5777 populateFromJoinedHeap(results, scannerContext);
5778 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5779 return true;
5780 }
5781 }
5782
5783
5784 if (joinedContinuationRow != null) {
5785 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
5786 }
5787
5788
5789
5790
5791 if (results.isEmpty()) {
5792 boolean moreRows = nextRow(currentRow, offset, length);
5793 if (!moreRows) {
5794 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5795 }
5796 if (!stopRow) continue;
5797 }
5798
5799
5800 if (stopRow) {
5801 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5802 } else {
5803 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
5804 }
5805 }
5806 }
5807
5808
5809
5810
5811
5812
5813
5814
5815 private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
5816 throws IOException {
5817 Cell nextJoinedKv = joinedHeap.peek();
5818 boolean matchCurrentRow =
5819 nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
5820 boolean matchAfterSeek = false;
5821
5822
5823
5824 if (!matchCurrentRow) {
5825 Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
5826 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
5827 matchAfterSeek =
5828 seekSuccessful && joinedHeap.peek() != null
5829 && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
5830 }
5831
5832 return matchCurrentRow || matchAfterSeek;
5833 }
5834
5835
5836
5837
5838
5839
5840
5841
5842 private boolean filterRow() throws IOException {
5843
5844
5845 return filter != null && (!filter.hasFilterRow())
5846 && filter.filterRow();
5847 }
5848
5849 private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
5850 return filter != null
5851 && filter.filterRowKey(row, offset, length);
5852 }
5853
5854 protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
5855 assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
5856 Cell next;
5857 while ((next = this.storeHeap.peek()) != null &&
5858 CellUtil.matchingRow(next, currentRow, offset, length)) {
5859 this.storeHeap.next(MOCKED_LIST);
5860 }
5861 resetFilters();
5862
5863 return this.region.getCoprocessorHost() == null
5864 || this.region.getCoprocessorHost()
5865 .postScannerFilterRow(this, currentRow, offset, length);
5866 }
5867
5868 protected boolean isStopRow(byte[] currentRow, int offset, short length) {
5869 return currentRow == null ||
5870 (stopRow != null &&
5871 comparator.compareRows(stopRow, 0, stopRow.length,
5872 currentRow, offset, length) <= isScan);
5873 }
5874
5875 @Override
5876 public synchronized void close() {
5877 if (storeHeap != null) {
5878 storeHeap.close();
5879 storeHeap = null;
5880 }
5881 if (joinedHeap != null) {
5882 joinedHeap.close();
5883 joinedHeap = null;
5884 }
5885
5886 scannerReadPoints.remove(this);
5887 this.filterClosed = true;
5888 }
5889
5890 KeyValueHeap getStoreHeapForTesting() {
5891 return storeHeap;
5892 }
5893
5894 @Override
5895 public synchronized boolean reseek(byte[] row) throws IOException {
5896 if (row == null) {
5897 throw new IllegalArgumentException("Row cannot be null.");
5898 }
5899 boolean result = false;
5900 startRegionOperation();
5901 try {
5902 KeyValue kv = KeyValueUtil.createFirstOnRow(row);
5903
5904 result = this.storeHeap.requestSeek(kv, true, true);
5905 if (this.joinedHeap != null) {
5906 result = this.joinedHeap.requestSeek(kv, true, true) || result;
5907 }
5908 } finally {
5909 closeRegionOperation();
5910 }
5911 return result;
5912 }
5913 }
5914
5915
5916
5917
5918
5919
5920
5921
5922
5923
5924
5925
5926
5927
5928
5929
5930
5931
5932
5933
5934 static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
5935 Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
5936 RegionServerServices rsServices) {
5937 try {
5938 @SuppressWarnings("unchecked")
5939 Class<? extends HRegion> regionClass =
5940 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
5941
5942 Constructor<? extends HRegion> c =
5943 regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
5944 Configuration.class, HRegionInfo.class, HTableDescriptor.class,
5945 RegionServerServices.class);
5946
5947 return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
5948 } catch (Throwable e) {
5949
5950 throw new IllegalStateException("Could not instantiate a region instance.", e);
5951 }
5952 }
5953
5954
5955
5956
5957
5958
5959
5960
5961
5962
5963
5964
5965
5966
5967
5968
5969
5970 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
5971 final Configuration conf, final HTableDescriptor hTableDescriptor)
5972 throws IOException {
5973 return createHRegion(info, rootDir, conf, hTableDescriptor, null);
5974 }
5975
5976
5977
5978
5979
5980
5981
5982
5983
5984
5985
5986 public static void closeHRegion(final HRegion r) throws IOException {
5987 if (r == null) return;
5988 r.close();
5989 if (r.getWAL() == null) return;
5990 r.getWAL().close();
5991 }
5992
5993
5994
5995
5996
5997
5998
5999
6000
6001
6002
6003
6004
6005
6006 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6007 final Configuration conf,
6008 final HTableDescriptor hTableDescriptor,
6009 final WAL wal,
6010 final boolean initialize)
6011 throws IOException {
6012 return createHRegion(info, rootDir, conf, hTableDescriptor,
6013 wal, initialize, false);
6014 }
6015
6016
6017
6018
6019
6020
6021
6022
6023
6024
6025
6026
6027
6028
6029
6030 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6031 final Configuration conf,
6032 final HTableDescriptor hTableDescriptor,
6033 final WAL wal,
6034 final boolean initialize, final boolean ignoreWAL)
6035 throws IOException {
6036 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6037 return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
6038 ignoreWAL);
6039 }
6040
6041
6042
6043
6044
6045
6046
6047
6048
6049
6050
6051
6052
6053
6054
6055
6056 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
6057 final Configuration conf,
6058 final HTableDescriptor hTableDescriptor,
6059 final WAL wal,
6060 final boolean initialize, final boolean ignoreWAL)
6061 throws IOException {
6062 LOG.info("creating HRegion " + info.getTable().getNameAsString()
6063 + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
6064 " Table name == " + info.getTable().getNameAsString());
6065 FileSystem fs = FileSystem.get(conf);
6066 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
6067 WAL effectiveWAL = wal;
6068 if (wal == null && !ignoreWAL) {
6069
6070
6071
6072 Configuration confForWAL = new Configuration(conf);
6073 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
6074 effectiveWAL = (new WALFactory(confForWAL,
6075 Collections.<WALActionsListener>singletonList(new MetricsWAL()),
6076 "hregion-" + RandomStringUtils.randomNumeric(8))).
6077 getWAL(info.getEncodedNameAsBytes());
6078 }
6079 HRegion region = HRegion.newHRegion(tableDir,
6080 effectiveWAL, fs, conf, info, hTableDescriptor, null);
6081 if (initialize) {
6082
6083
6084 region.setSequenceId(region.initialize(null));
6085 }
6086 return region;
6087 }
6088
6089 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6090 final Configuration conf,
6091 final HTableDescriptor hTableDescriptor,
6092 final WAL wal)
6093 throws IOException {
6094 return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
6095 }
6096
6097
6098
6099
6100
6101
6102
6103
6104
6105
6106
6107
6108
6109 public static HRegion openHRegion(final HRegionInfo info,
6110 final HTableDescriptor htd, final WAL wal,
6111 final Configuration conf)
6112 throws IOException {
6113 return openHRegion(info, htd, wal, conf, null, null);
6114 }
6115
6116
6117
6118
6119
6120
6121
6122
6123
6124
6125
6126
6127
6128
6129
6130
6131 public static HRegion openHRegion(final HRegionInfo info,
6132 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6133 final RegionServerServices rsServices,
6134 final CancelableProgressable reporter)
6135 throws IOException {
6136 return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
6137 }
6138
6139
6140
6141
6142
6143
6144
6145
6146
6147
6148
6149
6150
6151
6152 public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
6153 final HTableDescriptor htd, final WAL wal, final Configuration conf)
6154 throws IOException {
6155 return openHRegion(rootDir, info, htd, wal, conf, null, null);
6156 }
6157
6158
6159
6160
6161
6162
6163
6164
6165
6166
6167
6168
6169
6170
6171
6172
6173 public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
6174 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6175 final RegionServerServices rsServices,
6176 final CancelableProgressable reporter)
6177 throws IOException {
6178 FileSystem fs = null;
6179 if (rsServices != null) {
6180 fs = rsServices.getFileSystem();
6181 }
6182 if (fs == null) {
6183 fs = FileSystem.get(conf);
6184 }
6185 return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
6186 }
6187
6188
6189
6190
6191
6192
6193
6194
6195
6196
6197
6198
6199
6200
6201
6202 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6203 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
6204 throws IOException {
6205 return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
6206 }
6207
6208
6209
6210
6211
6212
6213
6214
6215
6216
6217
6218
6219
6220
6221
6222
6223
6224 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6225 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
6226 final RegionServerServices rsServices, final CancelableProgressable reporter)
6227 throws IOException {
6228 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6229 return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
6230 }
6231
6232
6233
6234
6235
6236
6237
6238
6239
6240
6241
6242
6243
6244
6245
6246
6247
6248 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6249 final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
6250 final WAL wal, final RegionServerServices rsServices,
6251 final CancelableProgressable reporter)
6252 throws IOException {
6253 if (info == null) throw new NullPointerException("Passed region info is null");
6254 if (LOG.isDebugEnabled()) {
6255 LOG.debug("Opening region: " + info);
6256 }
6257 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6258 return r.openHRegion(reporter);
6259 }
6260
6261
6262
6263
6264
6265
6266
6267
6268
6269 public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
6270 throws IOException {
6271 HRegionFileSystem regionFs = other.getRegionFileSystem();
6272 HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
6273 other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
6274 return r.openHRegion(reporter);
6275 }
6276
6277 public static Region openHRegion(final Region other, final CancelableProgressable reporter)
6278 throws IOException {
6279 return openHRegion((HRegion)other, reporter);
6280 }
6281
6282
6283
6284
6285
6286
6287
6288 protected HRegion openHRegion(final CancelableProgressable reporter)
6289 throws IOException {
6290
6291 checkCompressionCodecs();
6292
6293
6294 checkEncryption();
6295
6296 checkClassLoading();
6297 this.openSeqNum = initialize(reporter);
6298 this.setSequenceId(openSeqNum);
6299 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
6300 && !isRecovering) {
6301
6302
6303
6304 writeRegionOpenMarker(wal, openSeqNum);
6305 }
6306 return this;
6307 }
6308
6309 public static void warmupHRegion(final HRegionInfo info,
6310 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6311 final RegionServerServices rsServices,
6312 final CancelableProgressable reporter)
6313 throws IOException {
6314
6315 if (info == null) throw new NullPointerException("Passed region info is null");
6316
6317 if (LOG.isDebugEnabled()) {
6318 LOG.debug("HRegion.Warming up region: " + info);
6319 }
6320
6321 Path rootDir = FSUtils.getRootDir(conf);
6322 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6323
6324 FileSystem fs = null;
6325 if (rsServices != null) {
6326 fs = rsServices.getFileSystem();
6327 }
6328 if (fs == null) {
6329 fs = FileSystem.get(conf);
6330 }
6331
6332 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6333 r.initializeWarmup(reporter);
6334 r.close();
6335 }
6336
6337
6338 private void checkCompressionCodecs() throws IOException {
6339 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6340 CompressionTest.testCompression(fam.getCompression());
6341 CompressionTest.testCompression(fam.getCompactionCompression());
6342 }
6343 }
6344
6345 private void checkEncryption() throws IOException {
6346 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6347 EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
6348 }
6349 }
6350
6351 private void checkClassLoading() throws IOException {
6352 RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
6353 RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
6354 }
6355
6356
6357
6358
6359
6360
6361 HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
6362
6363 fs.commitDaughterRegion(hri);
6364
6365
6366 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
6367 this.getBaseConf(), hri, this.getTableDesc(), rsServices);
6368 r.readRequestsCount.set(this.getReadRequestsCount() / 2);
6369 r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
6370 return r;
6371 }
6372
6373
6374
6375
6376
6377
6378
6379 HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
6380 final HRegion region_b) throws IOException {
6381 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
6382 fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
6383 this.getTableDesc(), this.rsServices);
6384 r.readRequestsCount.set(this.getReadRequestsCount()
6385 + region_b.getReadRequestsCount());
6386 r.writeRequestsCount.set(this.getWriteRequestsCount()
6387
6388 + region_b.getWriteRequestsCount());
6389 this.fs.commitMergedRegion(mergedRegionInfo);
6390 return r;
6391 }
6392
6393
6394
6395
6396
6397
6398
6399
6400
6401
6402
6403
6404 public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
6405 meta.checkResources();
6406
6407 byte[] row = r.getRegionInfo().getRegionName();
6408 final long now = EnvironmentEdgeManager.currentTime();
6409 final List<Cell> cells = new ArrayList<Cell>(2);
6410 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6411 HConstants.REGIONINFO_QUALIFIER, now,
6412 r.getRegionInfo().toByteArray()));
6413
6414 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6415 HConstants.META_VERSION_QUALIFIER, now,
6416 Bytes.toBytes(HConstants.META_VERSION)));
6417 meta.put(row, HConstants.CATALOG_FAMILY, cells);
6418 }
6419
6420
6421
6422
6423
6424
6425
6426
6427 @Deprecated
6428 public static Path getRegionDir(final Path tabledir, final String name) {
6429 return new Path(tabledir, name);
6430 }
6431
6432
6433
6434
6435
6436
6437
6438
6439 @Deprecated
6440 @VisibleForTesting
6441 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
6442 return new Path(
6443 FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
6444 }
6445
6446
6447
6448
6449
6450
6451
6452
6453
6454 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
6455 return ((info.getStartKey().length == 0) ||
6456 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
6457 ((info.getEndKey().length == 0) ||
6458 (Bytes.compareTo(info.getEndKey(), row) > 0));
6459 }
6460
6461
6462
6463
6464
6465
6466
6467 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
6468 throws IOException {
6469 HRegion a = srcA;
6470 HRegion b = srcB;
6471
6472
6473
6474 if (srcA.getRegionInfo().getStartKey() == null) {
6475 if (srcB.getRegionInfo().getStartKey() == null) {
6476 throw new IOException("Cannot merge two regions with null start key");
6477 }
6478
6479 } else if ((srcB.getRegionInfo().getStartKey() == null) ||
6480 (Bytes.compareTo(srcA.getRegionInfo().getStartKey(),
6481 srcB.getRegionInfo().getStartKey()) > 0)) {
6482 a = srcB;
6483 b = srcA;
6484 }
6485
6486 if (!(Bytes.compareTo(a.getRegionInfo().getEndKey(),
6487 b.getRegionInfo().getStartKey()) == 0)) {
6488 throw new IOException("Cannot merge non-adjacent regions");
6489 }
6490 return merge(a, b);
6491 }
6492
6493
6494
6495
6496
6497
6498
6499
6500
6501 public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
6502 if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
6503 throw new IOException("Regions do not belong to the same table");
6504 }
6505
6506 FileSystem fs = a.getRegionFileSystem().getFileSystem();
6507
6508 a.flush(true);
6509 b.flush(true);
6510
6511
6512 a.compact(true);
6513 if (LOG.isDebugEnabled()) {
6514 LOG.debug("Files for region: " + a);
6515 a.getRegionFileSystem().logFileSystemState(LOG);
6516 }
6517 b.compact(true);
6518 if (LOG.isDebugEnabled()) {
6519 LOG.debug("Files for region: " + b);
6520 b.getRegionFileSystem().logFileSystemState(LOG);
6521 }
6522
6523 RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, true);
6524 if (!rmt.prepare(null)) {
6525 throw new IOException("Unable to merge regions " + a + " and " + b);
6526 }
6527 HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
6528 LOG.info("starting merge of regions: " + a + " and " + b
6529 + " into new region " + mergedRegionInfo.getRegionNameAsString()
6530 + " with start key <"
6531 + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
6532 + "> and end key <"
6533 + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
6534 HRegion dstRegion;
6535 try {
6536 dstRegion = (HRegion)rmt.execute(null, null);
6537 } catch (IOException ioe) {
6538 rmt.rollback(null, null);
6539 throw new IOException("Failed merging region " + a + " and " + b
6540 + ", and successfully rolled back");
6541 }
6542 dstRegion.compact(true);
6543
6544 if (LOG.isDebugEnabled()) {
6545 LOG.debug("Files for new region");
6546 dstRegion.getRegionFileSystem().logFileSystemState(LOG);
6547 }
6548
6549 if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
6550 throw new IOException("Merged region " + dstRegion
6551 + " still has references after the compaction, is compaction canceled?");
6552 }
6553
6554
6555 HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
6556
6557 HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
6558
6559 LOG.info("merge completed. New region is " + dstRegion);
6560 return dstRegion;
6561 }
6562
6563 @Override
6564 public Result get(final Get get) throws IOException {
6565 checkRow(get.getRow(), "Get");
6566
6567 if (get.hasFamilies()) {
6568 for (byte [] family: get.familySet()) {
6569 checkFamily(family);
6570 }
6571 } else {
6572 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
6573 get.addFamily(family);
6574 }
6575 }
6576 List<Cell> results = get(get, true);
6577 boolean stale = this.getRegionInfo().getReplicaId() != 0;
6578 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
6579 }
6580
6581 @Override
6582 public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
6583
6584 List<Cell> results = new ArrayList<Cell>();
6585
6586
6587 if (withCoprocessor && (coprocessorHost != null)) {
6588 if (coprocessorHost.preGet(get, results)) {
6589 return results;
6590 }
6591 }
6592
6593 Scan scan = new Scan(get);
6594
6595 RegionScanner scanner = null;
6596 try {
6597 scanner = getScanner(scan);
6598 scanner.next(results);
6599 } finally {
6600 if (scanner != null)
6601 scanner.close();
6602 }
6603
6604
6605 if (withCoprocessor && (coprocessorHost != null)) {
6606 coprocessorHost.postGet(get, results);
6607 }
6608
6609
6610 if (this.metricsRegion != null) {
6611 long totalSize = 0L;
6612 for (Cell cell : results) {
6613 totalSize += CellUtil.estimatedSerializedSizeOf(cell);
6614 }
6615 this.metricsRegion.updateGet(totalSize);
6616 }
6617
6618 return results;
6619 }
6620
6621 @Override
6622 public void mutateRow(RowMutations rm) throws IOException {
6623
6624 mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
6625 }
6626
6627
6628
6629
6630
6631 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6632 Collection<byte[]> rowsToLock) throws IOException {
6633 mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
6634 }
6635
6636
6637
6638
6639
6640
6641
6642
6643
6644
6645
6646
6647
6648 @Override
6649 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6650 Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
6651 MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
6652 processRowsWithLocks(proc, -1, nonceGroup, nonce);
6653 }
6654
6655
6656
6657
6658 public ClientProtos.RegionLoadStats getRegionStats() {
6659 if (!regionStatsEnabled) {
6660 return null;
6661 }
6662 ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
6663 stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
6664 .memstoreFlushSize)));
6665 stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
6666 return stats.build();
6667 }
6668
6669 @Override
6670 public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
6671 processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE,
6672 HConstants.NO_NONCE);
6673 }
6674
6675 @Override
6676 public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
6677 throws IOException {
6678 processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
6679 }
6680
6681 @Override
6682 public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
6683 long nonceGroup, long nonce) throws IOException {
6684
6685 for (byte[] row : processor.getRowsToLock()) {
6686 checkRow(row, "processRowsWithLocks");
6687 }
6688 if (!processor.readOnly()) {
6689 checkReadOnly();
6690 }
6691 checkResources();
6692
6693 startRegionOperation();
6694 WALEdit walEdit = new WALEdit();
6695
6696
6697 try {
6698 processor.preProcess(this, walEdit);
6699 } catch (IOException e) {
6700 closeRegionOperation();
6701 throw e;
6702 }
6703
6704 if (processor.readOnly()) {
6705 try {
6706 long now = EnvironmentEdgeManager.currentTime();
6707 doProcessRowWithTimeout(
6708 processor, now, this, null, null, timeout);
6709 processor.postProcess(this, walEdit, true);
6710 } finally {
6711 closeRegionOperation();
6712 }
6713 return;
6714 }
6715
6716 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
6717 boolean locked;
6718 boolean walSyncSuccessful = false;
6719 List<RowLock> acquiredRowLocks;
6720 long addedSize = 0;
6721 List<Mutation> mutations = new ArrayList<Mutation>();
6722 List<Cell> memstoreCells = new ArrayList<Cell>();
6723 Collection<byte[]> rowsToLock = processor.getRowsToLock();
6724 long mvccNum = 0;
6725 WALKey walKey = null;
6726 try {
6727
6728 acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
6729 for (byte[] row : rowsToLock) {
6730
6731 acquiredRowLocks.add(getRowLock(row));
6732 }
6733
6734 lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
6735 locked = true;
6736
6737 mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
6738
6739 long now = EnvironmentEdgeManager.currentTime();
6740 try {
6741
6742
6743 doProcessRowWithTimeout(
6744 processor, now, this, mutations, walEdit, timeout);
6745
6746 if (!mutations.isEmpty()) {
6747
6748 writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
6749
6750 processor.preBatchMutate(this, walEdit);
6751
6752 for (Mutation m : mutations) {
6753
6754 rewriteCellTags(m.getFamilyCellMap(), m);
6755
6756 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
6757 Cell cell = cellScanner.current();
6758 CellUtil.setSequenceId(cell, mvccNum);
6759 Store store = getStore(cell);
6760 if (store == null) {
6761 checkFamily(CellUtil.cloneFamily(cell));
6762
6763 }
6764 Pair<Long, Cell> ret = store.add(cell);
6765 addedSize += ret.getFirst();
6766 memstoreCells.add(ret.getSecond());
6767 }
6768 }
6769
6770 long txid = 0;
6771
6772 if (!walEdit.isEmpty()) {
6773
6774 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
6775 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
6776 processor.getClusterIds(), nonceGroup, nonce);
6777 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
6778 walKey, walEdit, getSequenceId(), true, memstoreCells);
6779 }
6780 if(walKey == null){
6781
6782
6783 walKey = this.appendEmptyEdit(this.wal, memstoreCells);
6784 }
6785
6786 if (locked) {
6787 this.updatesLock.readLock().unlock();
6788 locked = false;
6789 }
6790
6791
6792 releaseRowLocks(acquiredRowLocks);
6793
6794
6795 if (txid != 0) {
6796 syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
6797 }
6798 walSyncSuccessful = true;
6799
6800 processor.postBatchMutate(this);
6801 }
6802 } finally {
6803
6804
6805
6806 if (!mutations.isEmpty() && !walSyncSuccessful) {
6807 LOG.warn("Wal sync failed. Roll back " + mutations.size() +
6808 " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
6809 processor.getRowsToLock().iterator().next()) + "...");
6810 for (Mutation m : mutations) {
6811 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
6812 Cell cell = cellScanner.current();
6813 getStore(cell).rollback(cell);
6814 }
6815 }
6816 if (writeEntry != null) {
6817 mvcc.cancelMemstoreInsert(writeEntry);
6818 writeEntry = null;
6819 }
6820 }
6821
6822 if (writeEntry != null) {
6823 mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
6824 }
6825 if (locked) {
6826 this.updatesLock.readLock().unlock();
6827 }
6828
6829 releaseRowLocks(acquiredRowLocks);
6830 }
6831
6832
6833 processor.postProcess(this, walEdit, walSyncSuccessful);
6834
6835 } finally {
6836 closeRegionOperation();
6837 if (!mutations.isEmpty() &&
6838 isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
6839 requestFlush();
6840 }
6841 }
6842 }
6843
6844 private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
6845 final long now,
6846 final HRegion region,
6847 final List<Mutation> mutations,
6848 final WALEdit walEdit,
6849 final long timeout) throws IOException {
6850
6851 if (timeout < 0) {
6852 try {
6853 processor.process(now, region, mutations, walEdit);
6854 } catch (IOException e) {
6855 LOG.warn("RowProcessor:" + processor.getClass().getName() +
6856 " throws Exception on row(s):" +
6857 Bytes.toStringBinary(
6858 processor.getRowsToLock().iterator().next()) + "...", e);
6859 throw e;
6860 }
6861 return;
6862 }
6863
6864
6865 FutureTask<Void> task =
6866 new FutureTask<Void>(new Callable<Void>() {
6867 @Override
6868 public Void call() throws IOException {
6869 try {
6870 processor.process(now, region, mutations, walEdit);
6871 return null;
6872 } catch (IOException e) {
6873 LOG.warn("RowProcessor:" + processor.getClass().getName() +
6874 " throws Exception on row(s):" +
6875 Bytes.toStringBinary(
6876 processor.getRowsToLock().iterator().next()) + "...", e);
6877 throw e;
6878 }
6879 }
6880 });
6881 rowProcessorExecutor.execute(task);
6882 try {
6883 task.get(timeout, TimeUnit.MILLISECONDS);
6884 } catch (TimeoutException te) {
6885 LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
6886 Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
6887 "...");
6888 throw new IOException(te);
6889 } catch (Exception e) {
6890 throw new IOException(e);
6891 }
6892 }
6893
6894 public Result append(Append append) throws IOException {
6895 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
6896 }
6897
6898
6899
6900
6901
6902 @Override
6903 public Result append(Append append, long nonceGroup, long nonce) throws IOException {
6904 byte[] row = append.getRow();
6905 checkRow(row, "append");
6906 boolean flush = false;
6907 Durability durability = getEffectiveDurability(append.getDurability());
6908 boolean writeToWAL = durability != Durability.SKIP_WAL;
6909 WALEdit walEdits = null;
6910 List<Cell> allKVs = new ArrayList<Cell>(append.size());
6911 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
6912 long size = 0;
6913 long txid = 0;
6914
6915 checkReadOnly();
6916 checkResources();
6917
6918 startRegionOperation(Operation.APPEND);
6919 this.writeRequestsCount.increment();
6920 long mvccNum = 0;
6921 WriteEntry writeEntry = null;
6922 WALKey walKey = null;
6923 RowLock rowLock = null;
6924 List<Cell> memstoreCells = new ArrayList<Cell>();
6925 boolean doRollBackMemstore = false;
6926 try {
6927 rowLock = getRowLock(row);
6928 try {
6929 lock(this.updatesLock.readLock());
6930 try {
6931
6932
6933 mvcc.waitForPreviousTransactionsComplete();
6934 if (this.coprocessorHost != null) {
6935 Result r = this.coprocessorHost.preAppendAfterRowLock(append);
6936 if(r!= null) {
6937 return r;
6938 }
6939 }
6940
6941 mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
6942 writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
6943 long now = EnvironmentEdgeManager.currentTime();
6944
6945 for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
6946
6947 Store store = stores.get(family.getKey());
6948 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
6949
6950
6951
6952
6953
6954 Collections.sort(family.getValue(), store.getComparator());
6955
6956 Get get = new Get(row);
6957 for (Cell cell : family.getValue()) {
6958 get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
6959 }
6960 List<Cell> results = get(get, false);
6961
6962
6963
6964
6965
6966
6967
6968 int idx = 0;
6969 for (Cell cell : family.getValue()) {
6970 Cell newCell;
6971 Cell oldCell = null;
6972 if (idx < results.size()
6973 && CellUtil.matchingQualifier(results.get(idx), cell)) {
6974 oldCell = results.get(idx);
6975 long ts = Math.max(now, oldCell.getTimestamp());
6976
6977
6978 List<Tag> tags = Tag.carryForwardTags(null, oldCell);
6979 tags = Tag.carryForwardTags(tags, cell);
6980 tags = carryForwardTTLTag(tags, append);
6981
6982
6983 byte[] tagBytes = Tag.fromList(tags);
6984
6985
6986 newCell = new KeyValue(row.length, cell.getFamilyLength(),
6987 cell.getQualifierLength(), ts, KeyValue.Type.Put,
6988 oldCell.getValueLength() + cell.getValueLength(),
6989 tagBytes == null? 0: tagBytes.length);
6990
6991 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
6992 newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
6993 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
6994 newCell.getFamilyArray(), newCell.getFamilyOffset(),
6995 cell.getFamilyLength());
6996 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
6997 newCell.getQualifierArray(), newCell.getQualifierOffset(),
6998 cell.getQualifierLength());
6999
7000 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
7001 newCell.getValueArray(), newCell.getValueOffset(),
7002 oldCell.getValueLength());
7003 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
7004 newCell.getValueArray(),
7005 newCell.getValueOffset() + oldCell.getValueLength(),
7006 cell.getValueLength());
7007
7008 if (tagBytes != null) {
7009 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
7010 tagBytes.length);
7011 }
7012 idx++;
7013 } else {
7014
7015 CellUtil.updateLatestStamp(cell, now);
7016
7017
7018
7019 if (append.getTTL() != Long.MAX_VALUE) {
7020
7021 newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
7022 cell.getRowLength(),
7023 cell.getFamilyArray(), cell.getFamilyOffset(),
7024 cell.getFamilyLength(),
7025 cell.getQualifierArray(), cell.getQualifierOffset(),
7026 cell.getQualifierLength(),
7027 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
7028 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
7029 carryForwardTTLTag(append));
7030 } else {
7031 newCell = cell;
7032 }
7033 }
7034
7035 CellUtil.setSequenceId(newCell, mvccNum);
7036
7037 if (coprocessorHost != null) {
7038 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
7039 append, oldCell, newCell);
7040 }
7041 kvs.add(newCell);
7042
7043
7044 if (writeToWAL) {
7045 if (walEdits == null) {
7046 walEdits = new WALEdit();
7047 }
7048 walEdits.add(newCell);
7049 }
7050 }
7051
7052
7053 tempMemstore.put(store, kvs);
7054 }
7055
7056
7057 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
7058 Store store = entry.getKey();
7059 if (store.getFamily().getMaxVersions() == 1) {
7060
7061 size += store.upsert(entry.getValue(), getSmallestReadPoint());
7062 memstoreCells.addAll(entry.getValue());
7063 } else {
7064
7065 for (Cell cell: entry.getValue()) {
7066 Pair<Long, Cell> ret = store.add(cell);
7067 size += ret.getFirst();
7068 memstoreCells.add(ret.getSecond());
7069 doRollBackMemstore = true;
7070 }
7071 }
7072 allKVs.addAll(entry.getValue());
7073 }
7074
7075
7076 if (writeToWAL) {
7077
7078
7079
7080
7081 walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
7082 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
7083 txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
7084 this.sequenceId, true, memstoreCells);
7085 } else {
7086 recordMutationWithoutWal(append.getFamilyCellMap());
7087 }
7088 if (walKey == null) {
7089
7090 walKey = this.appendEmptyEdit(this.wal, memstoreCells);
7091 }
7092 size = this.addAndGetGlobalMemstoreSize(size);
7093 flush = isFlushSize(size);
7094 } finally {
7095 this.updatesLock.readLock().unlock();
7096 }
7097 } finally {
7098 rowLock.release();
7099 rowLock = null;
7100 }
7101
7102 if(txid != 0){
7103 syncOrDefer(txid, durability);
7104 }
7105 doRollBackMemstore = false;
7106 } finally {
7107 if (rowLock != null) {
7108 rowLock.release();
7109 }
7110
7111 if (doRollBackMemstore) {
7112 rollbackMemstore(memstoreCells);
7113 if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
7114 } else if (writeEntry != null) {
7115 mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
7116 }
7117
7118 closeRegionOperation(Operation.APPEND);
7119 }
7120
7121 if (this.metricsRegion != null) {
7122 this.metricsRegion.updateAppend();
7123 }
7124
7125 if (flush) {
7126
7127 requestFlush();
7128 }
7129
7130
7131 return append.isReturnResults() ? Result.create(allKVs) : null;
7132 }
7133
7134 public Result increment(Increment increment) throws IOException {
7135 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
7136 }
7137
7138
7139
7140
7141
7142 @Override
7143 public Result increment(Increment increment, long nonceGroup, long nonce)
7144 throws IOException {
7145 checkReadOnly();
7146 checkResources();
7147 checkRow(increment.getRow(), "increment");
7148 checkFamilies(increment.getFamilyCellMap().keySet());
7149 startRegionOperation(Operation.INCREMENT);
7150 this.writeRequestsCount.increment();
7151 try {
7152 return doIncrement(increment, nonceGroup, nonce);
7153 } finally {
7154 if (this.metricsRegion != null) this.metricsRegion.updateIncrement();
7155 closeRegionOperation(Operation.INCREMENT);
7156 }
7157 }
7158
7159 private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException {
7160 RowLock rowLock = null;
7161 WriteEntry writeEntry = null;
7162 WALKey walKey = null;
7163 boolean doRollBackMemstore = false;
7164 long accumulatedResultSize = 0;
7165 List<Cell> allKVs = new ArrayList<Cell>(increment.size());
7166 List<Cell> memstoreCells = new ArrayList<Cell>();
7167 Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
7168 try {
7169 rowLock = getRowLock(increment.getRow());
7170 long txid = 0;
7171 try {
7172 lock(this.updatesLock.readLock());
7173 try {
7174
7175
7176 this.mvcc.waitForPreviousTransactionsComplete();
7177 if (this.coprocessorHost != null) {
7178 Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
7179 if (r != null) return r;
7180 }
7181
7182 long mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
7183 writeEntry = this.mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
7184
7185
7186 long now = EnvironmentEdgeManager.currentTime();
7187 final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
7188 WALEdit walEdits = null;
7189 for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
7190 byte [] columnFamilyName = entry.getKey();
7191 List<Cell> increments = entry.getValue();
7192 Store store = this.stores.get(columnFamilyName);
7193
7194
7195 List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName,
7196 sort(increments, store.getComparator()), now, mvccNum, allKVs, null);
7197 if (!results.isEmpty()) {
7198
7199 if (writeToWAL) {
7200
7201
7202 int resultsSize = results.size();
7203 for (int i = 0; i < resultsSize; i++) {
7204 if (walEdits == null) walEdits = new WALEdit();
7205 walEdits.add(results.get(i));
7206 }
7207 }
7208
7209 if (store.getFamily().getMaxVersions() == 1) {
7210
7211 accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
7212 memstoreCells.addAll(results);
7213
7214 } else {
7215
7216 for (Cell cell: results) {
7217 Pair<Long, Cell> ret = store.add(cell);
7218 accumulatedResultSize += ret.getFirst();
7219 memstoreCells.add(ret.getSecond());
7220 doRollBackMemstore = true;
7221 }
7222 }
7223 }
7224 }
7225
7226
7227 if (walEdits != null && !walEdits.isEmpty()) {
7228 if (writeToWAL) {
7229
7230
7231
7232 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
7233 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
7234 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
7235 walKey, walEdits, getSequenceId(), true, memstoreCells);
7236 } else {
7237 recordMutationWithoutWal(increment.getFamilyCellMap());
7238 }
7239 }
7240 if (walKey == null) {
7241
7242 walKey = this.appendEmptyEdit(this.wal, memstoreCells);
7243 }
7244 } finally {
7245 this.updatesLock.readLock().unlock();
7246 }
7247 } finally {
7248 rowLock.release();
7249 rowLock = null;
7250 }
7251
7252 if (txid != 0) syncOrDefer(txid, effectiveDurability);
7253 doRollBackMemstore = false;
7254 } finally {
7255 if (rowLock != null) rowLock.release();
7256
7257 if (doRollBackMemstore) rollbackMemstore(memstoreCells);
7258 if (writeEntry != null) mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
7259 }
7260
7261 if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
7262 return increment.isReturnResults() ? Result.create(allKVs) : null;
7263 }
7264
7265
7266
7267
7268 private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {
7269 Collections.sort(cells, comparator);
7270 return cells;
7271 }
7272
7273
7274
7275
7276
7277
7278
7279
7280
7281
7282
7283
7284 private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName,
7285 List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs,
7286 final IsolationLevel isolation)
7287 throws IOException {
7288 List<Cell> results = new ArrayList<Cell>(sortedIncrements.size());
7289 byte [] row = increment.getRow();
7290
7291 List<Cell> currentValues =
7292 getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation);
7293
7294
7295 int idx = 0;
7296 for (int i = 0; i < sortedIncrements.size(); i++) {
7297 Cell inc = sortedIncrements.get(i);
7298 long incrementAmount = getLongValue(inc);
7299
7300 boolean writeBack = (incrementAmount != 0);
7301
7302 List<Tag> tags = Tag.carryForwardTags(inc);
7303
7304 Cell currentValue = null;
7305 long ts = now;
7306 if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) {
7307 currentValue = currentValues.get(idx);
7308 ts = Math.max(now, currentValue.getTimestamp());
7309 incrementAmount += getLongValue(currentValue);
7310
7311 tags = Tag.carryForwardTags(tags, currentValue);
7312 if (i < (sortedIncrements.size() - 1) &&
7313 !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++;
7314 }
7315
7316
7317 byte [] qualifier = CellUtil.cloneQualifier(inc);
7318 byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount);
7319 tags = carryForwardTTLTag(tags, increment);
7320
7321 Cell newValue = new KeyValue(row, 0, row.length,
7322 columnFamilyName, 0, columnFamilyName.length,
7323 qualifier, 0, qualifier.length,
7324 ts, KeyValue.Type.Put,
7325 incrementAmountInBytes, 0, incrementAmountInBytes.length,
7326 tags);
7327
7328
7329
7330 if (mvccNum != MultiVersionConsistencyControl.NO_WRITE_NUMBER) {
7331 CellUtil.setSequenceId(newValue, mvccNum);
7332 }
7333
7334
7335 if (coprocessorHost != null) {
7336 newValue = coprocessorHost.postMutationBeforeWAL(
7337 RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue);
7338 }
7339 allKVs.add(newValue);
7340 if (writeBack) {
7341 results.add(newValue);
7342 }
7343 }
7344 return results;
7345 }
7346
7347
7348
7349
7350
7351 private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
7352 int len = cell.getValueLength();
7353 if (len != Bytes.SIZEOF_LONG) {
7354
7355 throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
7356 }
7357 return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len);
7358 }
7359
7360
7361
7362
7363
7364
7365
7366
7367
7368
7369 private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily,
7370 final List<Cell> increments, final IsolationLevel isolation)
7371 throws IOException {
7372 Get get = new Get(increment.getRow());
7373 if (isolation != null) get.setIsolationLevel(isolation);
7374 for (Cell cell: increments) {
7375 get.addColumn(columnFamily, CellUtil.cloneQualifier(cell));
7376 }
7377 TimeRange tr = increment.getTimeRange();
7378 get.setTimeRange(tr.getMin(), tr.getMax());
7379 return get(get, false);
7380 }
7381
7382 private static List<Tag> carryForwardTTLTag(final Mutation mutation) {
7383 return carryForwardTTLTag(null, mutation);
7384 }
7385
7386
7387
7388
7389 private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull,
7390 final Mutation mutation) {
7391 long ttl = mutation.getTTL();
7392 if (ttl == Long.MAX_VALUE) return tagsOrNull;
7393 List<Tag> tags = tagsOrNull;
7394
7395
7396
7397 if (tags == null) tags = new ArrayList<Tag>(1);
7398 tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
7399 return tags;
7400 }
7401
7402
7403
7404
7405
7406 private void checkFamily(final byte [] family)
7407 throws NoSuchColumnFamilyException {
7408 if (!this.htableDescriptor.hasFamily(family)) {
7409 throw new NoSuchColumnFamilyException("Column family " +
7410 Bytes.toString(family) + " does not exist in region " + this
7411 + " in table " + this.htableDescriptor);
7412 }
7413 }
7414
7415 public static final long FIXED_OVERHEAD = ClassSize.align(
7416 ClassSize.OBJECT +
7417 ClassSize.ARRAY +
7418 45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
7419 (14 * Bytes.SIZEOF_LONG) +
7420 5 * Bytes.SIZEOF_BOOLEAN);
7421
7422
7423
7424
7425
7426
7427
7428
7429
7430
7431
7432 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
7433 ClassSize.OBJECT +
7434 (2 * ClassSize.ATOMIC_BOOLEAN) +
7435 (3 * ClassSize.ATOMIC_LONG) +
7436 (2 * ClassSize.CONCURRENT_HASHMAP) +
7437 WriteState.HEAP_SIZE +
7438 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
7439 (2 * ClassSize.REENTRANT_LOCK) +
7440 MultiVersionConsistencyControl.FIXED_SIZE
7441 + ClassSize.TREEMAP
7442 + 2 * ClassSize.ATOMIC_INTEGER
7443 ;
7444
7445 @Override
7446 public long heapSize() {
7447 long heapSize = DEEP_OVERHEAD;
7448 for (Store store : this.stores.values()) {
7449 heapSize += store.heapSize();
7450 }
7451
7452 return heapSize;
7453 }
7454
7455
7456
7457
7458
7459 private static void printUsageAndExit(final String message) {
7460 if (message != null && message.length() > 0) System.out.println(message);
7461 System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
7462 System.out.println("Options:");
7463 System.out.println(" major_compact Pass this option to major compact " +
7464 "passed region.");
7465 System.out.println("Default outputs scan of passed region.");
7466 System.exit(1);
7467 }
7468
7469 @Override
7470 public boolean registerService(Service instance) {
7471
7472
7473
7474 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
7475 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
7476 LOG.error("Coprocessor service "+serviceDesc.getFullName()+
7477 " already registered, rejecting request from "+instance
7478 );
7479 return false;
7480 }
7481
7482 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
7483 if (LOG.isDebugEnabled()) {
7484 LOG.debug("Registered coprocessor service: region=" +
7485 Bytes.toStringBinary(getRegionInfo().getRegionName()) +
7486 " service=" + serviceDesc.getFullName());
7487 }
7488 return true;
7489 }
7490
7491 @Override
7492 public Message execService(RpcController controller, CoprocessorServiceCall call)
7493 throws IOException {
7494 String serviceName = call.getServiceName();
7495 String methodName = call.getMethodName();
7496 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
7497 throw new UnknownProtocolException(null,
7498 "No registered coprocessor service found for name "+serviceName+
7499 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7500 }
7501
7502 Service service = coprocessorServiceHandlers.get(serviceName);
7503 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
7504 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
7505 if (methodDesc == null) {
7506 throw new UnknownProtocolException(service.getClass(),
7507 "Unknown method "+methodName+" called on service "+serviceName+
7508 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7509 }
7510
7511 Message.Builder builder = service.getRequestPrototype(methodDesc).newBuilderForType();
7512 ProtobufUtil.mergeFrom(builder, call.getRequest());
7513 Message request = builder.build();
7514
7515 if (coprocessorHost != null) {
7516 request = coprocessorHost.preEndpointInvocation(service, methodName, request);
7517 }
7518
7519 final Message.Builder responseBuilder =
7520 service.getResponsePrototype(methodDesc).newBuilderForType();
7521 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
7522 @Override
7523 public void run(Message message) {
7524 if (message != null) {
7525 responseBuilder.mergeFrom(message);
7526 }
7527 }
7528 });
7529
7530 if (coprocessorHost != null) {
7531 coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
7532 }
7533
7534 return responseBuilder.build();
7535 }
7536
7537
7538
7539
7540
7541
7542 private static void processTable(final FileSystem fs, final Path p,
7543 final WALFactory walFactory, final Configuration c,
7544 final boolean majorCompact)
7545 throws IOException {
7546 HRegion region;
7547 FSTableDescriptors fst = new FSTableDescriptors(c);
7548
7549 if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
7550 final WAL wal = walFactory.getMetaWAL(
7551 HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
7552 region = HRegion.newHRegion(p, wal, fs, c,
7553 HRegionInfo.FIRST_META_REGIONINFO, fst.get(TableName.META_TABLE_NAME), null);
7554 } else {
7555 throw new IOException("Not a known catalog table: " + p.toString());
7556 }
7557 try {
7558 region.initialize(null);
7559 if (majorCompact) {
7560 region.compact(true);
7561 } else {
7562
7563 Scan scan = new Scan();
7564
7565 RegionScanner scanner = region.getScanner(scan);
7566 try {
7567 List<Cell> kvs = new ArrayList<Cell>();
7568 boolean done;
7569 do {
7570 kvs.clear();
7571 done = scanner.next(kvs);
7572 if (kvs.size() > 0) LOG.info(kvs);
7573 } while (done);
7574 } finally {
7575 scanner.close();
7576 }
7577 }
7578 } finally {
7579 region.close();
7580 }
7581 }
7582
7583 boolean shouldForceSplit() {
7584 return this.splitRequest;
7585 }
7586
7587 byte[] getExplicitSplitPoint() {
7588 return this.explicitSplitPoint;
7589 }
7590
7591 void forceSplit(byte[] sp) {
7592
7593
7594 this.splitRequest = true;
7595 if (sp != null) {
7596 this.explicitSplitPoint = sp;
7597 }
7598 }
7599
7600 void clearSplit() {
7601 this.splitRequest = false;
7602 this.explicitSplitPoint = null;
7603 }
7604
7605
7606
7607
7608 protected void prepareToSplit() {
7609
7610 }
7611
7612
7613
7614
7615
7616
7617
7618 public byte[] checkSplit() {
7619
7620 if (this.getRegionInfo().isMetaTable() ||
7621 TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
7622 if (shouldForceSplit()) {
7623 LOG.warn("Cannot split meta region in HBase 0.20 and above");
7624 }
7625 return null;
7626 }
7627
7628
7629 if (this.isRecovering()) {
7630 LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
7631 return null;
7632 }
7633
7634 if (!splitPolicy.shouldSplit()) {
7635 return null;
7636 }
7637
7638 byte[] ret = splitPolicy.getSplitPoint();
7639
7640 if (ret != null) {
7641 try {
7642 checkRow(ret, "calculated split");
7643 } catch (IOException e) {
7644 LOG.error("Ignoring invalid split", e);
7645 return null;
7646 }
7647 }
7648 return ret;
7649 }
7650
7651
7652
7653
7654 public int getCompactPriority() {
7655 int count = Integer.MAX_VALUE;
7656 for (Store store : stores.values()) {
7657 count = Math.min(count, store.getCompactPriority());
7658 }
7659 return count;
7660 }
7661
7662
7663
7664 @Override
7665 public RegionCoprocessorHost getCoprocessorHost() {
7666 return coprocessorHost;
7667 }
7668
7669
7670 public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
7671 this.coprocessorHost = coprocessorHost;
7672 }
7673
7674 @Override
7675 public void startRegionOperation() throws IOException {
7676 startRegionOperation(Operation.ANY);
7677 }
7678
7679 @Override
7680 public void startRegionOperation(Operation op) throws IOException {
7681 switch (op) {
7682 case GET:
7683 case SCAN:
7684 checkReadsEnabled();
7685 case INCREMENT:
7686 case APPEND:
7687 case SPLIT_REGION:
7688 case MERGE_REGION:
7689 case PUT:
7690 case DELETE:
7691 case BATCH_MUTATE:
7692 case COMPACT_REGION:
7693
7694 if (isRecovering() && (this.disallowWritesInRecovering ||
7695 (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
7696 throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
7697 " is recovering; cannot take reads");
7698 }
7699 break;
7700 default:
7701 break;
7702 }
7703 if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
7704 || op == Operation.COMPACT_REGION) {
7705
7706
7707 return;
7708 }
7709 if (this.closing.get()) {
7710 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
7711 }
7712 lock(lock.readLock());
7713 if (this.closed.get()) {
7714 lock.readLock().unlock();
7715 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
7716 }
7717 try {
7718 if (coprocessorHost != null) {
7719 coprocessorHost.postStartRegionOperation(op);
7720 }
7721 } catch (Exception e) {
7722 lock.readLock().unlock();
7723 throw new IOException(e);
7724 }
7725 }
7726
7727 @Override
7728 public void closeRegionOperation() throws IOException {
7729 closeRegionOperation(Operation.ANY);
7730 }
7731
7732
7733
7734
7735
7736
7737 public void closeRegionOperation(Operation operation) throws IOException {
7738 lock.readLock().unlock();
7739 if (coprocessorHost != null) {
7740 coprocessorHost.postCloseRegionOperation(operation);
7741 }
7742 }
7743
7744
7745
7746
7747
7748
7749
7750
7751
7752
7753 private void startBulkRegionOperation(boolean writeLockNeeded)
7754 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
7755 if (this.closing.get()) {
7756 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
7757 }
7758 if (writeLockNeeded) lock(lock.writeLock());
7759 else lock(lock.readLock());
7760 if (this.closed.get()) {
7761 if (writeLockNeeded) lock.writeLock().unlock();
7762 else lock.readLock().unlock();
7763 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
7764 }
7765 }
7766
7767
7768
7769
7770
7771 private void closeBulkRegionOperation(){
7772 if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
7773 else lock.readLock().unlock();
7774 }
7775
7776
7777
7778
7779
7780 private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
7781 numMutationsWithoutWAL.increment();
7782 if (numMutationsWithoutWAL.get() <= 1) {
7783 LOG.info("writing data to region " + this +
7784 " with WAL disabled. Data may be lost in the event of a crash.");
7785 }
7786
7787 long mutationSize = 0;
7788 for (List<Cell> cells: familyMap.values()) {
7789 assert cells instanceof RandomAccess;
7790 int listSize = cells.size();
7791 for (int i=0; i < listSize; i++) {
7792 Cell cell = cells.get(i);
7793
7794 mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
7795 }
7796 }
7797
7798 dataInMemoryWithoutWAL.add(mutationSize);
7799 }
7800
7801 private void lock(final Lock lock)
7802 throws RegionTooBusyException, InterruptedIOException {
7803 lock(lock, 1);
7804 }
7805
7806
7807
7808
7809
7810
7811 private void lock(final Lock lock, final int multiplier)
7812 throws RegionTooBusyException, InterruptedIOException {
7813 try {
7814 final long waitTime = Math.min(maxBusyWaitDuration,
7815 busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
7816 if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
7817 throw new RegionTooBusyException(
7818 "failed to get a lock in " + waitTime + " ms. " +
7819 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
7820 this.getRegionInfo().getRegionNameAsString()) +
7821 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
7822 this.getRegionServerServices().getServerName()));
7823 }
7824 } catch (InterruptedException ie) {
7825 LOG.info("Interrupted while waiting for a lock");
7826 InterruptedIOException iie = new InterruptedIOException();
7827 iie.initCause(ie);
7828 throw iie;
7829 }
7830 }
7831
7832
7833
7834
7835
7836
7837
7838 private void syncOrDefer(long txid, Durability durability) throws IOException {
7839 if (this.getRegionInfo().isMetaRegion()) {
7840 this.wal.sync(txid);
7841 } else {
7842 switch(durability) {
7843 case USE_DEFAULT:
7844
7845 if (shouldSyncWAL()) {
7846 this.wal.sync(txid);
7847 }
7848 break;
7849 case SKIP_WAL:
7850
7851 break;
7852 case ASYNC_WAL:
7853
7854 break;
7855 case SYNC_WAL:
7856 case FSYNC_WAL:
7857
7858 this.wal.sync(txid);
7859 break;
7860 }
7861 }
7862 }
7863
7864
7865
7866
7867 private boolean shouldSyncWAL() {
7868 return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
7869 }
7870
7871
7872
7873
7874 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
7875
7876 @Override
7877 public void add(int index, Cell element) {
7878
7879 }
7880
7881 @Override
7882 public boolean addAll(int index, Collection<? extends Cell> c) {
7883 return false;
7884 }
7885
7886 @Override
7887 public KeyValue get(int index) {
7888 throw new UnsupportedOperationException();
7889 }
7890
7891 @Override
7892 public int size() {
7893 return 0;
7894 }
7895 };
7896
7897
7898
7899
7900
7901
7902
7903
7904
7905
7906 public static void main(String[] args) throws IOException {
7907 if (args.length < 1) {
7908 printUsageAndExit(null);
7909 }
7910 boolean majorCompact = false;
7911 if (args.length > 1) {
7912 if (!args[1].toLowerCase().startsWith("major")) {
7913 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
7914 }
7915 majorCompact = true;
7916 }
7917 final Path tableDir = new Path(args[0]);
7918 final Configuration c = HBaseConfiguration.create();
7919 final FileSystem fs = FileSystem.get(c);
7920 final Path logdir = new Path(c.get("hbase.tmp.dir"));
7921 final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
7922
7923 final Configuration walConf = new Configuration(c);
7924 FSUtils.setRootDir(walConf, logdir);
7925 final WALFactory wals = new WALFactory(walConf, null, logname);
7926 try {
7927 processTable(fs, tableDir, wals, c, majorCompact);
7928 } finally {
7929 wals.close();
7930
7931 BlockCache bc = new CacheConfig(c).getBlockCache();
7932 if (bc != null) bc.shutdown();
7933 }
7934 }
7935
7936 @Override
7937 public long getOpenSeqNum() {
7938 return this.openSeqNum;
7939 }
7940
7941 @Override
7942 public Map<byte[], Long> getMaxStoreSeqId() {
7943 return this.maxSeqIdInStores;
7944 }
7945
7946 @Override
7947 public long getOldestSeqIdOfStore(byte[] familyName) {
7948 return wal.getEarliestMemstoreSeqNum(getRegionInfo()
7949 .getEncodedNameAsBytes(), familyName);
7950 }
7951
7952 @Override
7953 public CompactionState getCompactionState() {
7954 boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
7955 return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
7956 : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
7957 }
7958
7959 public void reportCompactionRequestStart(boolean isMajor){
7960 (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
7961 }
7962
7963 public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
7964 int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
7965
7966
7967 compactionsFinished.incrementAndGet();
7968 compactionNumFilesCompacted.addAndGet(numFiles);
7969 compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
7970
7971 assert newValue >= 0;
7972 }
7973
7974
7975
7976
7977
7978 @VisibleForTesting
7979 public AtomicLong getSequenceId() {
7980 return this.sequenceId;
7981 }
7982
7983
7984
7985
7986
7987 private void setSequenceId(long value) {
7988 this.sequenceId.set(value);
7989 }
7990
7991 @VisibleForTesting class RowLockContext {
7992 private final HashedBytes row;
7993 private final CountDownLatch latch = new CountDownLatch(1);
7994 private final Thread thread;
7995 private int lockCount = 0;
7996
7997 RowLockContext(HashedBytes row) {
7998 this.row = row;
7999 this.thread = Thread.currentThread();
8000 }
8001
8002 boolean ownedByCurrentThread() {
8003 return thread == Thread.currentThread();
8004 }
8005
8006 RowLock newLock() {
8007 lockCount++;
8008 RowLockImpl rl = new RowLockImpl();
8009 rl.setContext(this);
8010 return rl;
8011 }
8012
8013 void releaseLock() {
8014 if (!ownedByCurrentThread()) {
8015 throw new IllegalArgumentException("Lock held by thread: " + thread
8016 + " cannot be released by different thread: " + Thread.currentThread());
8017 }
8018 lockCount--;
8019 if (lockCount == 0) {
8020
8021 RowLockContext existingContext = lockedRows.remove(row);
8022 if (existingContext != this) {
8023 throw new RuntimeException(
8024 "Internal row lock state inconsistent, should not happen, row: " + row);
8025 }
8026 latch.countDown();
8027 }
8028 }
8029
8030 @Override
8031 public String toString() {
8032 return "RowLockContext{" +
8033 "row=" + row +
8034 ", count=" + lockCount +
8035 ", threadName=" + thread.getName() +
8036 '}';
8037 }
8038
8039 }
8040
8041 public static class RowLockImpl implements RowLock {
8042 private RowLockContext context;
8043 private boolean released = false;
8044
8045 @VisibleForTesting
8046 public RowLockContext getContext() {
8047 return context;
8048 }
8049
8050 @VisibleForTesting
8051 public void setContext(RowLockContext context) {
8052 this.context = context;
8053 }
8054
8055 @Override
8056 public void release() {
8057 if (!released) {
8058 context.releaseLock();
8059 }
8060 released = true;
8061 }
8062 }
8063
8064
8065
8066
8067
8068
8069
8070
8071
8072
8073 private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
8074
8075 @SuppressWarnings("deprecation")
8076 WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
8077 WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
8078
8079
8080 wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, getSequenceId(), false,
8081 cells);
8082 return key;
8083 }
8084
8085
8086
8087
8088 @Override
8089 public void onConfigurationChange(Configuration conf) {
8090
8091 }
8092
8093
8094
8095
8096 @Override
8097 public void registerChildren(ConfigurationManager manager) {
8098 configurationManager = Optional.of(manager);
8099 for (Store s : this.stores.values()) {
8100 configurationManager.get().registerObserver(s);
8101 }
8102 }
8103
8104
8105
8106
8107 @Override
8108 public void deregisterChildren(ConfigurationManager manager) {
8109 for (Store s : this.stores.values()) {
8110 configurationManager.get().deregisterObserver(s);
8111 }
8112 }
8113
8114
8115
8116
8117 public RegionSplitPolicy getSplitPolicy() {
8118 return this.splitPolicy;
8119 }
8120
8121
8122 }