1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.io.UnsupportedEncodingException;
26  import java.lang.reflect.Constructor;
27  import java.text.ParseException;
28  import java.util.AbstractList;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.NavigableMap;
41  import java.util.NavigableSet;
42  import java.util.RandomAccess;
43  import java.util.Set;
44  import java.util.TreeMap;
45  import java.util.concurrent.Callable;
46  import java.util.concurrent.CompletionService;
47  import java.util.concurrent.ConcurrentHashMap;
48  import java.util.concurrent.ConcurrentMap;
49  import java.util.concurrent.ConcurrentSkipListMap;
50  import java.util.concurrent.CountDownLatch;
51  import java.util.concurrent.ExecutionException;
52  import java.util.concurrent.ExecutorCompletionService;
53  import java.util.concurrent.ExecutorService;
54  import java.util.concurrent.Executors;
55  import java.util.concurrent.Future;
56  import java.util.concurrent.FutureTask;
57  import java.util.concurrent.ThreadFactory;
58  import java.util.concurrent.ThreadPoolExecutor;
59  import java.util.concurrent.TimeUnit;
60  import java.util.concurrent.TimeoutException;
61  import java.util.concurrent.atomic.AtomicBoolean;
62  import java.util.concurrent.atomic.AtomicInteger;
63  import java.util.concurrent.atomic.AtomicLong;
64  import java.util.concurrent.locks.Lock;
65  import java.util.concurrent.locks.ReentrantReadWriteLock;
66  
67  import org.apache.commons.lang.RandomStringUtils;
68  import org.apache.commons.logging.Log;
69  import org.apache.commons.logging.LogFactory;
70  import org.apache.hadoop.conf.Configuration;
71  import org.apache.hadoop.fs.FileStatus;
72  import org.apache.hadoop.fs.FileSystem;
73  import org.apache.hadoop.fs.Path;
74  import org.apache.hadoop.hbase.Cell;
75  import org.apache.hadoop.hbase.CellScanner;
76  import org.apache.hadoop.hbase.CellUtil;
77  import org.apache.hadoop.hbase.CompoundConfiguration;
78  import org.apache.hadoop.hbase.DoNotRetryIOException;
79  import org.apache.hadoop.hbase.DroppedSnapshotException;
80  import org.apache.hadoop.hbase.HBaseConfiguration;
81  import org.apache.hadoop.hbase.HColumnDescriptor;
82  import org.apache.hadoop.hbase.HConstants;
83  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
84  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
85  import org.apache.hadoop.hbase.HRegionInfo;
86  import org.apache.hadoop.hbase.HTableDescriptor;
87  import org.apache.hadoop.hbase.KeyValue;
88  import org.apache.hadoop.hbase.KeyValueUtil;
89  import org.apache.hadoop.hbase.NamespaceDescriptor;
90  import org.apache.hadoop.hbase.NotServingRegionException;
91  import org.apache.hadoop.hbase.RegionTooBusyException;
92  import org.apache.hadoop.hbase.TableName;
93  import org.apache.hadoop.hbase.Tag;
94  import org.apache.hadoop.hbase.TagType;
95  import org.apache.hadoop.hbase.UnknownScannerException;
96  import org.apache.hadoop.hbase.backup.HFileArchiver;
97  import org.apache.hadoop.hbase.classification.InterfaceAudience;
98  import org.apache.hadoop.hbase.client.Append;
99  import org.apache.hadoop.hbase.client.Delete;
100 import org.apache.hadoop.hbase.client.Durability;
101 import org.apache.hadoop.hbase.client.Get;
102 import org.apache.hadoop.hbase.client.Increment;
103 import org.apache.hadoop.hbase.client.IsolationLevel;
104 import org.apache.hadoop.hbase.client.Mutation;
105 import org.apache.hadoop.hbase.client.Put;
106 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
107 import org.apache.hadoop.hbase.client.Result;
108 import org.apache.hadoop.hbase.client.RowMutations;
109 import org.apache.hadoop.hbase.client.Scan;
110 import org.apache.hadoop.hbase.conf.ConfigurationManager;
111 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
112 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
113 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
114 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
115 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
116 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
117 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
118 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
119 import org.apache.hadoop.hbase.filter.FilterWrapper;
120 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
121 import org.apache.hadoop.hbase.io.HeapSize;
122 import org.apache.hadoop.hbase.io.TimeRange;
123 import org.apache.hadoop.hbase.io.hfile.BlockCache;
124 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
125 import org.apache.hadoop.hbase.io.hfile.HFile;
126 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
127 import org.apache.hadoop.hbase.ipc.RpcCallContext;
128 import org.apache.hadoop.hbase.ipc.RpcServer;
129 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
130 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
131 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
135 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
136 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
137 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
138 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
139 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
140 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
141 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
142 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
143 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
144 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
145 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
146 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
147 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
148 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
149 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
150 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
151 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
152 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
153 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
154 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
155 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
156 import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
157 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
158 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
159 import org.apache.hadoop.hbase.security.User;
160 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
161 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
162 import org.apache.hadoop.hbase.util.ByteStringer;
163 import org.apache.hadoop.hbase.util.Bytes;
164 import org.apache.hadoop.hbase.util.CancelableProgressable;
165 import org.apache.hadoop.hbase.util.ClassSize;
166 import org.apache.hadoop.hbase.util.CompressionTest;
167 import org.apache.hadoop.hbase.util.Counter;
168 import org.apache.hadoop.hbase.util.EncryptionTest;
169 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
170 import org.apache.hadoop.hbase.util.FSTableDescriptors;
171 import org.apache.hadoop.hbase.util.FSUtils;
172 import org.apache.hadoop.hbase.util.HashedBytes;
173 import org.apache.hadoop.hbase.util.Pair;
174 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
175 import org.apache.hadoop.hbase.util.Threads;
176 import org.apache.hadoop.hbase.wal.WAL;
177 import org.apache.hadoop.hbase.wal.WALFactory;
178 import org.apache.hadoop.hbase.wal.WALKey;
179 import org.apache.hadoop.hbase.wal.WALSplitter;
180 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
181 import org.apache.hadoop.io.MultipleIOException;
182 import org.apache.hadoop.util.StringUtils;
183 import org.apache.htrace.Trace;
184 import org.apache.htrace.TraceScope;
185 
186 import com.google.common.annotations.VisibleForTesting;
187 import com.google.common.base.Optional;
188 import com.google.common.base.Preconditions;
189 import com.google.common.collect.Lists;
190 import com.google.common.collect.Maps;
191 import com.google.common.io.Closeables;
192 import com.google.protobuf.ByteString;
193 import com.google.protobuf.Descriptors;
194 import com.google.protobuf.Message;
195 import com.google.protobuf.RpcCallback;
196 import com.google.protobuf.RpcController;
197 import com.google.protobuf.Service;
198 import com.google.protobuf.TextFormat;
199 
200 @InterfaceAudience.Private
201 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
202   public static final Log LOG = LogFactory.getLog(HRegion.class);
203 
204   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
205     "hbase.hregion.scan.loadColumnFamiliesOnDemand";
206 
207   
208 
209 
210 
211 
212 
213   private final int maxWaitForSeqId;
214   private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
215   private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
216 
217   
218 
219 
220 
221   private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
222 
223   final AtomicBoolean closed = new AtomicBoolean(false);
224   
225 
226 
227 
228 
229   final AtomicBoolean closing = new AtomicBoolean(false);
230 
231   
232 
233 
234 
235   private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
236 
237   
238 
239 
240 
241 
242   private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
243   
244 
245 
246 
247 
248 
249 
250 
251 
252 
253 
254 
255   private final AtomicLong sequenceId = new AtomicLong(-1L);
256 
257   
258 
259 
260 
261 
262   protected volatile long lastReplayedOpenRegionSeqId = -1L;
263   protected volatile long lastReplayedCompactionSeqId = -1L;
264 
265   
266   
267   
268   
269   
270   
271   
272   
273   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
274       new ConcurrentHashMap<HashedBytes, RowLockContext>();
275 
276   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
277       Bytes.BYTES_RAWCOMPARATOR);
278 
279   
280   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
281 
282   public final AtomicLong memstoreSize = new AtomicLong(0);
283 
284   
285   final Counter numMutationsWithoutWAL = new Counter();
286   final Counter dataInMemoryWithoutWAL = new Counter();
287 
288   
289   final Counter checkAndMutateChecksPassed = new Counter();
290   final Counter checkAndMutateChecksFailed = new Counter();
291 
292   
293   final Counter readRequestsCount = new Counter();
294   final Counter writeRequestsCount = new Counter();
295 
296   
297   private final Counter blockedRequestsCount = new Counter();
298 
299   
300   final AtomicLong compactionsFinished = new AtomicLong(0L);
301   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
302   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
303 
304   private final WAL wal;
305   private final HRegionFileSystem fs;
306   protected final Configuration conf;
307   private final Configuration baseConf;
308   private final KeyValue.KVComparator comparator;
309   private final int rowLockWaitDuration;
310   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
311 
312   
313   
314   
315   
316   
317   
318   final long busyWaitDuration;
319   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
320 
321   
322   
323   
324   final int maxBusyWaitMultiplier;
325 
326   
327   
328   final long maxBusyWaitDuration;
329 
330   
331   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
332   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
333 
334   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
335 
336   
337 
338 
339   private long openSeqNum = HConstants.NO_SEQNUM;
340 
341   
342 
343 
344 
345   private boolean isLoadingCfsOnDemandDefault = false;
346 
347   private final AtomicInteger majorInProgress = new AtomicInteger(0);
348   private final AtomicInteger minorInProgress = new AtomicInteger(0);
349 
350   
351   
352   
353   
354   
355   
356   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
357 
358   
359   private PrepareFlushResult prepareFlushResult = null;
360 
361   
362 
363 
364   private boolean disallowWritesInRecovering = false;
365 
366   
367   private volatile boolean isRecovering = false;
368 
369   private volatile Optional<ConfigurationManager> configurationManager;
370 
371   
372 
373 
374 
375 
376   public long getSmallestReadPoint() {
377     long minimumReadPoint;
378     
379     
380     
381     synchronized(scannerReadPoints) {
382       minimumReadPoint = mvcc.memstoreReadPoint();
383 
384       for (Long readPoint: this.scannerReadPoints.values()) {
385         if (readPoint < minimumReadPoint) {
386           minimumReadPoint = readPoint;
387         }
388       }
389     }
390     return minimumReadPoint;
391   }
392 
393   
394 
395 
396 
397   static class WriteState {
398     
399     volatile boolean flushing = false;
400     
401     volatile boolean flushRequested = false;
402     
403     volatile int compacting = 0;
404     
405     volatile boolean writesEnabled = true;
406     
407     volatile boolean readOnly = false;
408     
409     
410     volatile boolean readsEnabled = true;
411 
412     
413 
414 
415 
416 
417     synchronized void setReadOnly(final boolean onOff) {
418       this.writesEnabled = !onOff;
419       this.readOnly = onOff;
420     }
421 
422     boolean isReadOnly() {
423       return this.readOnly;
424     }
425 
426     boolean isFlushRequested() {
427       return this.flushRequested;
428     }
429 
430     void setReadsEnabled(boolean readsEnabled) {
431       this.readsEnabled = readsEnabled;
432     }
433 
434     static final long HEAP_SIZE = ClassSize.align(
435         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
436   }
437 
438   
439 
440 
441 
442 
443 
444   public static class FlushResultImpl implements FlushResult {
445     final Result result;
446     final String failureReason;
447     final long flushSequenceId;
448     final boolean wroteFlushWalMarker;
449 
450     
451 
452 
453 
454 
455 
456 
457     FlushResultImpl(Result result, long flushSequenceId) {
458       this(result, flushSequenceId, null, false);
459       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
460           .FLUSHED_COMPACTION_NEEDED;
461     }
462 
463     
464 
465 
466 
467 
468     FlushResultImpl(Result result, String failureReason, boolean wroteFlushMarker) {
469       this(result, -1, failureReason, wroteFlushMarker);
470       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
471     }
472 
473     
474 
475 
476 
477 
478 
479     FlushResultImpl(Result result, long flushSequenceId, String failureReason,
480       boolean wroteFlushMarker) {
481       this.result = result;
482       this.flushSequenceId = flushSequenceId;
483       this.failureReason = failureReason;
484       this.wroteFlushWalMarker = wroteFlushMarker;
485     }
486 
487     
488 
489 
490 
491 
492     @Override
493     public boolean isFlushSucceeded() {
494       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
495           .FLUSHED_COMPACTION_NEEDED;
496     }
497 
498     
499 
500 
501 
502     @Override
503     public boolean isCompactionNeeded() {
504       return result == Result.FLUSHED_COMPACTION_NEEDED;
505     }
506 
507     @Override
508     public String toString() {
509       return new StringBuilder()
510         .append("flush result:").append(result).append(", ")
511         .append("failureReason:").append(failureReason).append(",")
512         .append("flush seq id").append(flushSequenceId).toString();
513     }
514 
515     @Override
516     public Result getResult() {
517       return result;
518     }
519   }
520 
521   
522   @VisibleForTesting
523   static class PrepareFlushResult {
524     final FlushResult result; 
525     final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
526     final TreeMap<byte[], List<Path>> committedFiles;
527     final TreeMap<byte[], Long> storeFlushableSize;
528     final long startTime;
529     final long flushOpSeqId;
530     final long flushedSeqId;
531     final long totalFlushableSize;
532 
533     
534     PrepareFlushResult(FlushResult result, long flushSeqId) {
535       this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
536     }
537 
538     
539     PrepareFlushResult(
540       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
541       TreeMap<byte[], List<Path>> committedFiles,
542       TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
543       long flushedSeqId, long totalFlushableSize) {
544       this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
545         flushSeqId, flushedSeqId, totalFlushableSize);
546     }
547 
548     private PrepareFlushResult(
549       FlushResult result,
550       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
551       TreeMap<byte[], List<Path>> committedFiles,
552       TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
553       long flushedSeqId, long totalFlushableSize) {
554       this.result = result;
555       this.storeFlushCtxs = storeFlushCtxs;
556       this.committedFiles = committedFiles;
557       this.storeFlushableSize = storeFlushableSize;
558       this.startTime = startTime;
559       this.flushOpSeqId = flushSeqId;
560       this.flushedSeqId = flushedSeqId;
561       this.totalFlushableSize = totalFlushableSize;
562     }
563 
564     public FlushResult getResult() {
565       return this.result;
566     }
567   }
568 
569   final WriteState writestate = new WriteState();
570 
571   long memstoreFlushSize;
572   final long timestampSlop;
573   final long rowProcessorTimeout;
574 
575   
576   private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
577       new ConcurrentHashMap<Store, Long>();
578 
579   final RegionServerServices rsServices;
580   private RegionServerAccounting rsAccounting;
581   private long flushCheckInterval;
582   
583   private long flushPerChanges;
584   private long blockingMemStoreSize;
585   final long threadWakeFrequency;
586   
587   final ReentrantReadWriteLock lock =
588     new ReentrantReadWriteLock();
589 
590   
591   private final ReentrantReadWriteLock updatesLock =
592     new ReentrantReadWriteLock();
593   private boolean splitRequest;
594   private byte[] explicitSplitPoint = null;
595 
596   private final MultiVersionConsistencyControl mvcc =
597       new MultiVersionConsistencyControl();
598 
599   
600   private RegionCoprocessorHost coprocessorHost;
601 
602   private HTableDescriptor htableDescriptor = null;
603   private RegionSplitPolicy splitPolicy;
604   private FlushPolicy flushPolicy;
605 
606   private final MetricsRegion metricsRegion;
607   private final MetricsRegionWrapperImpl metricsRegionWrapper;
608   private final Durability durability;
609   private final boolean regionStatsEnabled;
610 
611   
612 
613 
614 
615 
616 
617 
618 
619 
620 
621 
622 
623 
624 
625 
626 
627 
628 
629 
630 
631   @Deprecated
632   public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
633       final Configuration confParam, final HRegionInfo regionInfo,
634       final HTableDescriptor htd, final RegionServerServices rsServices) {
635     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
636       wal, confParam, htd, rsServices);
637   }
638 
639   
640 
641 
642 
643 
644 
645 
646 
647 
648 
649 
650 
651 
652 
653 
654 
655   public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
656       final HTableDescriptor htd, final RegionServerServices rsServices) {
657     if (htd == null) {
658       throw new IllegalArgumentException("Need table descriptor");
659     }
660 
661     if (confParam instanceof CompoundConfiguration) {
662       throw new IllegalArgumentException("Need original base configuration");
663     }
664 
665     this.comparator = fs.getRegionInfo().getComparator();
666     this.wal = wal;
667     this.fs = fs;
668 
669     
670     this.baseConf = confParam;
671     this.conf = new CompoundConfiguration()
672       .add(confParam)
673       .addStringMap(htd.getConfiguration())
674       .addWritableMap(htd.getValues());
675     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
676         DEFAULT_CACHE_FLUSH_INTERVAL);
677     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
678     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
679       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
680           + MAX_FLUSH_PER_CHANGES);
681     }
682     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
683                     DEFAULT_ROWLOCK_WAIT_DURATION);
684 
685     this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
686     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
687     this.htableDescriptor = htd;
688     this.rsServices = rsServices;
689     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
690     setHTableSpecificConf();
691     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
692 
693     this.busyWaitDuration = conf.getLong(
694       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
695     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
696     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
697       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
698         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
699         + maxBusyWaitMultiplier + "). Their product should be positive");
700     }
701     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
702       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
703 
704     
705 
706 
707 
708 
709 
710     this.timestampSlop = conf.getLong(
711         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
712         HConstants.LATEST_TIMESTAMP);
713 
714     
715 
716 
717 
718     this.rowProcessorTimeout = conf.getLong(
719         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
720     this.durability = htd.getDurability() == Durability.USE_DEFAULT
721         ? DEFAULT_DURABILITY
722         : htd.getDurability();
723     if (rsServices != null) {
724       this.rsAccounting = this.rsServices.getRegionServerAccounting();
725       
726       
727       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
728       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
729       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
730 
731       Map<String, Region> recoveringRegions = rsServices.getRecoveringRegions();
732       String encodedName = getRegionInfo().getEncodedName();
733       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
734         this.isRecovering = true;
735         recoveringRegions.put(encodedName, this);
736       }
737     } else {
738       this.metricsRegionWrapper = null;
739       this.metricsRegion = null;
740     }
741     if (LOG.isDebugEnabled()) {
742       
743       LOG.debug("Instantiated " + this);
744     }
745 
746     
747     this.disallowWritesInRecovering =
748         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
749           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
750     configurationManager = Optional.absent();
751 
752     
753     this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
754         NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
755           false :
756           conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
757               HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
758   }
759 
760   void setHTableSpecificConf() {
761     if (this.htableDescriptor == null) return;
762     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
763 
764     if (flushSize <= 0) {
765       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
766         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
767     }
768     this.memstoreFlushSize = flushSize;
769     this.blockingMemStoreSize = this.memstoreFlushSize *
770         conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
771                 HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
772   }
773 
774   
775 
776 
777 
778 
779 
780 
781 
782   @Deprecated
783   public long initialize() throws IOException {
784     return initialize(null);
785   }
786 
787   
788 
789 
790 
791 
792 
793 
794   private long initialize(final CancelableProgressable reporter) throws IOException {
795     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
796     long nextSeqId = -1;
797     try {
798       nextSeqId = initializeRegionInternals(reporter, status);
799       return nextSeqId;
800     } finally {
801       
802       
803       if (nextSeqId == -1) {
804         status.abort("Exception during region " + getRegionInfo().getRegionNameAsString() +
805           " initialization.");
806       }
807     }
808   }
809 
810   private long initializeRegionInternals(final CancelableProgressable reporter,
811       final MonitoredTask status) throws IOException {
812     if (coprocessorHost != null) {
813       status.setStatus("Running coprocessor pre-open hook");
814       coprocessorHost.preOpen();
815     }
816 
817     
818     status.setStatus("Writing region info on filesystem");
819     fs.checkRegionInfoOnFilesystem();
820 
821     
822     status.setStatus("Initializing all the Stores");
823     long maxSeqId = initializeRegionStores(reporter, status, false);
824     this.lastReplayedOpenRegionSeqId = maxSeqId;
825 
826     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
827     this.writestate.flushRequested = false;
828     this.writestate.compacting = 0;
829 
830     if (this.writestate.writesEnabled) {
831       
832       status.setStatus("Cleaning up temporary data from old regions");
833       fs.cleanupTempDir();
834     }
835 
836     if (this.writestate.writesEnabled) {
837       status.setStatus("Cleaning up detritus from prior splits");
838       
839       
840       
841       fs.cleanupAnySplitDetritus();
842       fs.cleanupMergesDir();
843     }
844 
845     
846     this.splitPolicy = RegionSplitPolicy.create(this, conf);
847 
848     
849     this.flushPolicy = FlushPolicyFactory.create(this, conf);
850 
851     long lastFlushTime = EnvironmentEdgeManager.currentTime();
852     for (Store store: stores.values()) {
853       this.lastStoreFlushTimeMap.put(store, lastFlushTime);
854     }
855 
856     
857     
858     long nextSeqid = maxSeqId;
859 
860     
861     
862     
863     if (this.writestate.writesEnabled) {
864       nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
865           .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1));
866     } else {
867       nextSeqid++;
868     }
869 
870     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
871       "; next sequenceid=" + nextSeqid);
872 
873     
874     this.closing.set(false);
875     this.closed.set(false);
876 
877     if (coprocessorHost != null) {
878       status.setStatus("Running coprocessor post-open hooks");
879       coprocessorHost.postOpen();
880     }
881 
882     status.markComplete("Region opened successfully");
883     return nextSeqid;
884   }
885 
886   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status,
887       boolean warmupOnly)
888       throws IOException {
889 
890     
891 
892     long maxSeqId = -1;
893     
894     long maxMemstoreTS = -1;
895 
896     if (!htableDescriptor.getFamilies().isEmpty()) {
897       
898       ThreadPoolExecutor storeOpenerThreadPool =
899         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
900       CompletionService<HStore> completionService =
901         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
902 
903       
904       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
905         status.setStatus("Instantiating store for column family " + family);
906         completionService.submit(new Callable<HStore>() {
907           @Override
908           public HStore call() throws IOException {
909             return instantiateHStore(family);
910           }
911         });
912       }
913       boolean allStoresOpened = false;
914       try {
915         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
916           Future<HStore> future = completionService.take();
917           HStore store = future.get();
918           this.stores.put(store.getFamily().getName(), store);
919 
920           long storeMaxSequenceId = store.getMaxSequenceId();
921           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
922               storeMaxSequenceId);
923           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
924             maxSeqId = storeMaxSequenceId;
925           }
926           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
927           if (maxStoreMemstoreTS > maxMemstoreTS) {
928             maxMemstoreTS = maxStoreMemstoreTS;
929           }
930         }
931         allStoresOpened = true;
932       } catch (InterruptedException e) {
933         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
934       } catch (ExecutionException e) {
935         throw new IOException(e.getCause());
936       } finally {
937         storeOpenerThreadPool.shutdownNow();
938         if (!allStoresOpened) {
939           
940           LOG.error("Could not initialize all stores for the region=" + this);
941           for (Store store : this.stores.values()) {
942             try {
943               store.close();
944             } catch (IOException e) {
945               LOG.warn(e.getMessage());
946             }
947           }
948         }
949       }
950     }
951     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this) && !warmupOnly) {
952       
953       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
954           this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
955     }
956     maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
957     mvcc.initialize(maxSeqId);
958     return maxSeqId;
959   }
960 
961   private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
962     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
963 
964     
965     status.setStatus("Warming up all the Stores");
966     initializeRegionStores(reporter, status, true);
967   }
968 
969   
970 
971 
972   private NavigableMap<byte[], List<Path>> getStoreFiles() {
973     NavigableMap<byte[], List<Path>> allStoreFiles =
974       new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
975     for (Store store: getStores()) {
976       Collection<StoreFile> storeFiles = store.getStorefiles();
977       if (storeFiles == null) continue;
978       List<Path> storeFileNames = new ArrayList<Path>();
979       for (StoreFile storeFile: storeFiles) {
980         storeFileNames.add(storeFile.getPath());
981       }
982       allStoreFiles.put(store.getFamily().getName(), storeFileNames);
983     }
984     return allStoreFiles;
985   }
986 
987   private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
988     Map<byte[], List<Path>> storeFiles = getStoreFiles();
989     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
990       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
991       getRegionServerServices().getServerName(), storeFiles);
992     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
993       getSequenceId());
994   }
995 
996   private void writeRegionCloseMarker(WAL wal) throws IOException {
997     Map<byte[], List<Path>> storeFiles = getStoreFiles();
998     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
999       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
1000       getRegionServerServices().getServerName(), storeFiles);
1001     WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
1002       getSequenceId());
1003 
1004     
1005     
1006     
1007     if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
1008       WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
1009         getSequenceId().get(), 0);
1010     }
1011   }
1012 
1013   
1014 
1015 
1016   public boolean hasReferences() {
1017     for (Store store : this.stores.values()) {
1018       if (store.hasReferences()) return true;
1019     }
1020     return false;
1021   }
1022 
1023   @Override
1024   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
1025     HDFSBlocksDistribution hdfsBlocksDistribution =
1026       new HDFSBlocksDistribution();
1027     synchronized (this.stores) {
1028       for (Store store : this.stores.values()) {
1029         Collection<StoreFile> storeFiles = store.getStorefiles();
1030         if (storeFiles == null) continue;
1031         for (StoreFile sf : storeFiles) {
1032           HDFSBlocksDistribution storeFileBlocksDistribution =
1033             sf.getHDFSBlockDistribution();
1034           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
1035         }
1036       }
1037     }
1038     return hdfsBlocksDistribution;
1039   }
1040 
1041   
1042 
1043 
1044 
1045 
1046 
1047 
1048 
1049   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1050       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
1051     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
1052     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
1053   }
1054 
1055   
1056 
1057 
1058 
1059 
1060 
1061 
1062 
1063 
1064   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1065       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
1066       throws IOException {
1067     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1068     FileSystem fs = tablePath.getFileSystem(conf);
1069 
1070     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1071     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1072       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1073       if (storeFiles == null) continue;
1074       for (StoreFileInfo storeFileInfo : storeFiles) {
1075         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1076       }
1077     }
1078     return hdfsBlocksDistribution;
1079   }
1080 
1081   
1082 
1083 
1084 
1085 
1086   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1087     if (this.rsAccounting != null) {
1088       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1089     }
1090     long size = this.memstoreSize.addAndGet(memStoreSize);
1091     
1092     
1093     if (size < 0) {
1094       LOG.error("Asked to modify this region's (" + this.toString()
1095       + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
1096       + (size-memStoreSize) + ", delta=" + memStoreSize, new Exception());
1097     }
1098     return size;
1099   }
1100 
1101   @Override
1102   public HRegionInfo getRegionInfo() {
1103     return this.fs.getRegionInfo();
1104   }
1105 
1106   
1107 
1108 
1109 
1110   RegionServerServices getRegionServerServices() {
1111     return this.rsServices;
1112   }
1113 
1114   @Override
1115   public long getReadRequestsCount() {
1116     return readRequestsCount.get();
1117   }
1118 
1119   @Override
1120   public void updateReadRequestsCount(long i) {
1121     readRequestsCount.add(i);
1122   }
1123 
1124   @Override
1125   public long getWriteRequestsCount() {
1126     return writeRequestsCount.get();
1127   }
1128 
1129   @Override
1130   public void updateWriteRequestsCount(long i) {
1131     writeRequestsCount.add(i);
1132   }
1133 
1134   @Override
1135   public long getMemstoreSize() {
1136     return memstoreSize.get();
1137   }
1138 
1139   @Override
1140   public long getNumMutationsWithoutWAL() {
1141     return numMutationsWithoutWAL.get();
1142   }
1143 
1144   @Override
1145   public long getDataInMemoryWithoutWAL() {
1146     return dataInMemoryWithoutWAL.get();
1147   }
1148 
1149   @Override
1150   public long getBlockedRequestsCount() {
1151     return blockedRequestsCount.get();
1152   }
1153 
1154   @Override
1155   public long getCheckAndMutateChecksPassed() {
1156     return checkAndMutateChecksPassed.get();
1157   }
1158 
1159   @Override
1160   public long getCheckAndMutateChecksFailed() {
1161     return checkAndMutateChecksFailed.get();
1162   }
1163 
1164   @Override
1165   public MetricsRegion getMetrics() {
1166     return metricsRegion;
1167   }
1168 
1169   @Override
1170   public boolean isClosed() {
1171     return this.closed.get();
1172   }
1173 
1174   @Override
1175   public boolean isClosing() {
1176     return this.closing.get();
1177   }
1178 
1179   @Override
1180   public boolean isReadOnly() {
1181     return this.writestate.isReadOnly();
1182   }
1183 
1184   
1185 
1186 
1187   public void setRecovering(boolean newState) {
1188     boolean wasRecovering = this.isRecovering;
1189     
1190     
1191     if (wal != null && getRegionServerServices() != null && !writestate.readOnly
1192         && wasRecovering && !newState) {
1193 
1194       
1195       boolean forceFlush = getTableDesc().getRegionReplication() > 1;
1196 
1197       
1198       MonitoredTask status = TaskMonitor.get().createStatus(
1199         "Flushing region " + this + " because recovery is finished");
1200       try {
1201         if (forceFlush) {
1202           internalFlushcache(status);
1203         }
1204 
1205         status.setStatus("Writing region open event marker to WAL because recovery is finished");
1206         try {
1207           long seqId = openSeqNum;
1208           
1209           if (wal != null) {
1210             seqId = getNextSequenceId(wal);
1211           }
1212           writeRegionOpenMarker(wal, seqId);
1213         } catch (IOException e) {
1214           
1215           
1216           LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening "
1217               + "event to WAL, continueing", e);
1218         }
1219       } catch (IOException ioe) {
1220         
1221         
1222         LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush "
1223             + "event to WAL, continueing", ioe);
1224       } finally {
1225         status.cleanup();
1226       }
1227     }
1228 
1229     this.isRecovering = newState;
1230     if (wasRecovering && !isRecovering) {
1231       
1232       coprocessorHost.postLogReplay();
1233     }
1234   }
1235 
1236   @Override
1237   public boolean isRecovering() {
1238     return this.isRecovering;
1239   }
1240 
1241   @Override
1242   public boolean isAvailable() {
1243     return !isClosed() && !isClosing();
1244   }
1245 
1246   
1247   public boolean isSplittable() {
1248     return isAvailable() && !hasReferences();
1249   }
1250 
1251   
1252 
1253 
1254   public boolean isMergeable() {
1255     if (!isAvailable()) {
1256       LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1257           + " is not mergeable because it is closing or closed");
1258       return false;
1259     }
1260     if (hasReferences()) {
1261       LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1262           + " is not mergeable because it has references");
1263       return false;
1264     }
1265 
1266     return true;
1267   }
1268 
1269   public boolean areWritesEnabled() {
1270     synchronized(this.writestate) {
1271       return this.writestate.writesEnabled;
1272     }
1273   }
1274 
1275    public MultiVersionConsistencyControl getMVCC() {
1276      return mvcc;
1277    }
1278 
1279    @Override
1280    public long getMaxFlushedSeqId() {
1281      return maxFlushedSeqId;
1282    }
1283 
1284    @Override
1285    public long getReadpoint(IsolationLevel isolationLevel) {
1286      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1287        
1288        return Long.MAX_VALUE;
1289      }
1290      return mvcc.memstoreReadPoint();
1291    }
1292 
1293    @Override
1294    public boolean isLoadingCfsOnDemandDefault() {
1295      return this.isLoadingCfsOnDemandDefault;
1296    }
1297 
1298   
1299 
1300 
1301 
1302 
1303 
1304 
1305 
1306 
1307 
1308 
1309 
1310 
1311 
1312 
1313 
1314   public Map<byte[], List<StoreFile>> close() throws IOException {
1315     return close(false);
1316   }
1317 
1318   private final Object closeLock = new Object();
1319 
1320   
1321   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1322       "hbase.regionserver.optionalcacheflushinterval";
1323   
1324   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1325   public static final int META_CACHE_FLUSH_INTERVAL = 300000; 
1326 
1327   
1328   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1329       "hbase.regionserver.flush.per.changes";
1330   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; 
1331   
1332 
1333 
1334 
1335   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; 
1336 
1337   
1338 
1339 
1340 
1341 
1342 
1343 
1344 
1345 
1346 
1347 
1348 
1349 
1350 
1351 
1352 
1353 
1354   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1355     
1356     
1357     MonitoredTask status = TaskMonitor.get().createStatus(
1358         "Closing region " + this +
1359         (abort ? " due to abort" : ""));
1360 
1361     status.setStatus("Waiting for close lock");
1362     try {
1363       synchronized (closeLock) {
1364         return doClose(abort, status);
1365       }
1366     } finally {
1367       status.cleanup();
1368     }
1369   }
1370 
1371   
1372 
1373 
1374   @VisibleForTesting
1375   public void setClosing(boolean closing) {
1376     this.closing.set(closing);
1377   }
1378 
1379   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1380       throws IOException {
1381     if (isClosed()) {
1382       LOG.warn("Region " + this + " already closed");
1383       return null;
1384     }
1385 
1386     if (coprocessorHost != null) {
1387       status.setStatus("Running coprocessor pre-close hooks");
1388       this.coprocessorHost.preClose(abort);
1389     }
1390 
1391     status.setStatus("Disabling compacts and flushes for region");
1392     boolean canFlush = true;
1393     synchronized (writestate) {
1394       
1395       
1396       canFlush = !writestate.readOnly;
1397       writestate.writesEnabled = false;
1398       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1399       waitForFlushesAndCompactions();
1400     }
1401     
1402     
1403     
1404     if (!abort && worthPreFlushing() && canFlush) {
1405       status.setStatus("Pre-flushing region before close");
1406       LOG.info("Running close preflush of " + getRegionInfo().getRegionNameAsString());
1407       try {
1408         internalFlushcache(status);
1409       } catch (IOException ioe) {
1410         
1411         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1412       }
1413     }
1414 
1415     
1416     lock.writeLock().lock();
1417     this.closing.set(true);
1418     status.setStatus("Disabling writes for close");
1419     try {
1420       if (this.isClosed()) {
1421         status.abort("Already got closed by another process");
1422         
1423         return null;
1424       }
1425       LOG.debug("Updates disabled for region " + this);
1426       
1427       if (!abort && canFlush) {
1428         int flushCount = 0;
1429         while (this.memstoreSize.get() > 0) {
1430           try {
1431             if (flushCount++ > 0) {
1432               int actualFlushes = flushCount - 1;
1433               if (actualFlushes > 5) {
1434                 
1435                 
1436                 throw new DroppedSnapshotException("Failed clearing memory after " +
1437                   actualFlushes + " attempts on region: " +
1438                     Bytes.toStringBinary(getRegionInfo().getRegionName()));
1439               }
1440               LOG.info("Running extra flush, " + actualFlushes +
1441                 " (carrying snapshot?) " + this);
1442             }
1443             internalFlushcache(status);
1444           } catch (IOException ioe) {
1445             status.setStatus("Failed flush " + this + ", putting online again");
1446             synchronized (writestate) {
1447               writestate.writesEnabled = true;
1448             }
1449             
1450             throw ioe;
1451           }
1452         }
1453       }
1454 
1455       Map<byte[], List<StoreFile>> result =
1456         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1457       if (!stores.isEmpty()) {
1458         
1459         ThreadPoolExecutor storeCloserThreadPool =
1460           getStoreOpenAndCloseThreadPool("StoreCloserThread-" +
1461             getRegionInfo().getRegionNameAsString());
1462         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1463           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1464 
1465         
1466         for (final Store store : stores.values()) {
1467           long flushableSize = store.getFlushableSize();
1468           if (!(abort || flushableSize == 0 || writestate.readOnly)) {
1469             getRegionServerServices().abort("Assertion failed while closing store "
1470                 + getRegionInfo().getRegionNameAsString() + " " + store
1471                 + ". flushableSize expected=0, actual= " + flushableSize
1472                 + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
1473                 + "operation failed and left the memstore in a partially updated state.", null);
1474           }
1475           completionService
1476               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1477                 @Override
1478                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1479                   return new Pair<byte[], Collection<StoreFile>>(
1480                     store.getFamily().getName(), store.close());
1481                 }
1482               });
1483         }
1484         try {
1485           for (int i = 0; i < stores.size(); i++) {
1486             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1487             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1488             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1489             if (familyFiles == null) {
1490               familyFiles = new ArrayList<StoreFile>();
1491               result.put(storeFiles.getFirst(), familyFiles);
1492             }
1493             familyFiles.addAll(storeFiles.getSecond());
1494           }
1495         } catch (InterruptedException e) {
1496           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1497         } catch (ExecutionException e) {
1498           throw new IOException(e.getCause());
1499         } finally {
1500           storeCloserThreadPool.shutdownNow();
1501         }
1502       }
1503 
1504       status.setStatus("Writing region close event to WAL");
1505       if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1506         writeRegionCloseMarker(wal);
1507       }
1508 
1509       this.closed.set(true);
1510       if (!canFlush) {
1511         addAndGetGlobalMemstoreSize(-memstoreSize.get());
1512       } else if (memstoreSize.get() != 0) {
1513         LOG.error("Memstore size is " + memstoreSize.get());
1514       }
1515       if (coprocessorHost != null) {
1516         status.setStatus("Running coprocessor post-close hooks");
1517         this.coprocessorHost.postClose(abort);
1518       }
1519       if (this.metricsRegion != null) {
1520         this.metricsRegion.close();
1521       }
1522       if (this.metricsRegionWrapper != null) {
1523         Closeables.closeQuietly(this.metricsRegionWrapper);
1524       }
1525       status.markComplete("Closed");
1526       LOG.info("Closed " + this);
1527       return result;
1528     } finally {
1529       lock.writeLock().unlock();
1530     }
1531   }
1532 
1533   @Override
1534   public void waitForFlushesAndCompactions() {
1535     synchronized (writestate) {
1536       if (this.writestate.readOnly) {
1537         
1538         
1539         return;
1540       }
1541       boolean interrupted = false;
1542       try {
1543         while (writestate.compacting > 0 || writestate.flushing) {
1544           LOG.debug("waiting for " + writestate.compacting + " compactions"
1545             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1546           try {
1547             writestate.wait();
1548           } catch (InterruptedException iex) {
1549             
1550             LOG.warn("Interrupted while waiting");
1551             interrupted = true;
1552           }
1553         }
1554       } finally {
1555         if (interrupted) {
1556           Thread.currentThread().interrupt();
1557         }
1558       }
1559     }
1560   }
1561 
1562   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1563       final String threadNamePrefix) {
1564     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1565     int maxThreads = Math.min(numStores,
1566         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1567             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1568     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1569   }
1570 
1571   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1572       final String threadNamePrefix) {
1573     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1574     int maxThreads = Math.max(1,
1575         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1576             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1577             / numStores);
1578     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1579   }
1580 
1581   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1582       final String threadNamePrefix) {
1583     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1584       new ThreadFactory() {
1585         private int count = 1;
1586 
1587         @Override
1588         public Thread newThread(Runnable r) {
1589           return new Thread(r, threadNamePrefix + "-" + count++);
1590         }
1591       });
1592   }
1593 
1594    
1595 
1596 
1597   private boolean worthPreFlushing() {
1598     return this.memstoreSize.get() >
1599       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1600   }
1601 
1602   
1603   
1604   
1605 
1606   @Override
1607   public HTableDescriptor getTableDesc() {
1608     return this.htableDescriptor;
1609   }
1610 
1611   
1612   public WAL getWAL() {
1613     return this.wal;
1614   }
1615 
1616   
1617 
1618 
1619 
1620 
1621 
1622 
1623   Configuration getBaseConf() {
1624     return this.baseConf;
1625   }
1626 
1627   
1628   public FileSystem getFilesystem() {
1629     return fs.getFileSystem();
1630   }
1631 
1632   
1633   public HRegionFileSystem getRegionFileSystem() {
1634     return this.fs;
1635   }
1636 
1637   @Override
1638   public long getEarliestFlushTimeForAllStores() {
1639     return lastStoreFlushTimeMap.isEmpty() ? Long.MAX_VALUE : Collections.min(lastStoreFlushTimeMap
1640         .values());
1641   }
1642 
1643   @Override
1644   public long getOldestHfileTs(boolean majorCompactioOnly) throws IOException {
1645     long result = Long.MAX_VALUE;
1646     for (Store store : getStores()) {
1647       Collection<StoreFile> storeFiles = store.getStorefiles();
1648       if (storeFiles == null) continue;
1649       for (StoreFile file : storeFiles) {
1650         StoreFile.Reader sfReader = file.getReader();
1651         if (sfReader == null) continue;
1652         HFile.Reader reader = sfReader.getHFileReader();
1653         if (reader == null) continue;
1654         if (majorCompactioOnly) {
1655           byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
1656           if (val == null) continue;
1657           if (val == null || !Bytes.toBoolean(val)) {
1658             continue;
1659           }
1660         }
1661         result = Math.min(result, reader.getFileContext().getFileCreateTime());
1662       }
1663     }
1664     return result == Long.MAX_VALUE ? 0 : result;
1665   }
1666 
1667   RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) {
1668     long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId;
1669     byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
1670     regionLoadBldr.clearStoreCompleteSequenceId();
1671     for (byte[] familyName : this.stores.keySet()) {
1672       long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
1673       
1674       
1675       regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId
1676           .newBuilder()
1677           .setFamilyName(ByteString.copyFrom(familyName))
1678           .setSequenceId(
1679             oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
1680     }
1681     return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
1682   }
1683 
1684   
1685   
1686   
1687   
1688   
1689   
1690 
1691   
1692   public long getLargestHStoreSize() {
1693     long size = 0;
1694     for (Store h : stores.values()) {
1695       long storeSize = h.getSize();
1696       if (storeSize > size) {
1697         size = storeSize;
1698       }
1699     }
1700     return size;
1701   }
1702 
1703   
1704 
1705 
1706   public KeyValue.KVComparator getComparator() {
1707     return this.comparator;
1708   }
1709 
1710   
1711 
1712 
1713 
1714   protected void doRegionCompactionPrep() throws IOException {
1715   }
1716 
1717   @Override
1718   public void triggerMajorCompaction() throws IOException {
1719     for (Store s : getStores()) {
1720       s.triggerMajorCompaction();
1721     }
1722   }
1723 
1724   @Override
1725   public void compact(final boolean majorCompaction) throws IOException {
1726     if (majorCompaction) {
1727       triggerMajorCompaction();
1728     }
1729     for (Store s : getStores()) {
1730       CompactionContext compaction = s.requestCompaction();
1731       if (compaction != null) {
1732         CompactionThroughputController controller = null;
1733         if (rsServices != null) {
1734           controller = CompactionThroughputControllerFactory.create(rsServices, conf);
1735         }
1736         if (controller == null) {
1737           controller = NoLimitCompactionThroughputController.INSTANCE;
1738         }
1739         compact(compaction, s, controller, null);
1740       }
1741     }
1742   }
1743 
1744   
1745 
1746 
1747 
1748 
1749 
1750   public void compactStores() throws IOException {
1751     for (Store s : getStores()) {
1752       CompactionContext compaction = s.requestCompaction();
1753       if (compaction != null) {
1754         compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
1755       }
1756     }
1757   }
1758 
1759   
1760 
1761 
1762 
1763 
1764 
1765   @VisibleForTesting
1766   void compactStore(byte[] family, CompactionThroughputController throughputController)
1767       throws IOException {
1768     Store s = getStore(family);
1769     CompactionContext compaction = s.requestCompaction();
1770     if (compaction != null) {
1771       compact(compaction, s, throughputController, null);
1772     }
1773   }
1774 
1775   
1776 
1777 
1778 
1779 
1780 
1781 
1782 
1783 
1784 
1785 
1786 
1787 
1788 
1789 
1790   public boolean compact(CompactionContext compaction, Store store,
1791       CompactionThroughputController throughputController) throws IOException {
1792     return compact(compaction, store, throughputController, null);
1793   }
1794 
1795   public boolean compact(CompactionContext compaction, Store store,
1796       CompactionThroughputController throughputController, User user) throws IOException {
1797     assert compaction != null && compaction.hasSelection();
1798     assert !compaction.getRequest().getFiles().isEmpty();
1799     if (this.closing.get() || this.closed.get()) {
1800       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1801       store.cancelRequestedCompaction(compaction);
1802       return false;
1803     }
1804     MonitoredTask status = null;
1805     boolean requestNeedsCancellation = true;
1806     
1807     lock.readLock().lock();
1808     try {
1809       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1810       if (stores.get(cf) != store) {
1811         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1812             + " has been re-instantiated, cancel this compaction request. "
1813             + " It may be caused by the roll back of split transaction");
1814         return false;
1815       }
1816 
1817       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1818       if (this.closed.get()) {
1819         String msg = "Skipping compaction on " + this + " because closed";
1820         LOG.debug(msg);
1821         status.abort(msg);
1822         return false;
1823       }
1824       boolean wasStateSet = false;
1825       try {
1826         synchronized (writestate) {
1827           if (writestate.writesEnabled) {
1828             wasStateSet = true;
1829             ++writestate.compacting;
1830           } else {
1831             String msg = "NOT compacting region " + this + ". Writes disabled.";
1832             LOG.info(msg);
1833             status.abort(msg);
1834             return false;
1835           }
1836         }
1837         LOG.info("Starting compaction on " + store + " in region " + this
1838             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1839         doRegionCompactionPrep();
1840         try {
1841           status.setStatus("Compacting store " + store);
1842           
1843           
1844           requestNeedsCancellation = false;
1845           store.compact(compaction, throughputController, user);
1846         } catch (InterruptedIOException iioe) {
1847           String msg = "compaction interrupted";
1848           LOG.info(msg, iioe);
1849           status.abort(msg);
1850           return false;
1851         }
1852       } finally {
1853         if (wasStateSet) {
1854           synchronized (writestate) {
1855             --writestate.compacting;
1856             if (writestate.compacting <= 0) {
1857               writestate.notifyAll();
1858             }
1859           }
1860         }
1861       }
1862       status.markComplete("Compaction complete");
1863       return true;
1864     } finally {
1865       try {
1866         if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1867         if (status != null) status.cleanup();
1868       } finally {
1869         lock.readLock().unlock();
1870       }
1871     }
1872   }
1873 
1874   @Override
1875   public FlushResult flush(boolean force) throws IOException {
1876     return flushcache(force, false);
1877   }
1878 
1879   
1880 
1881 
1882 
1883 
1884 
1885 
1886 
1887 
1888 
1889 
1890 
1891 
1892 
1893 
1894 
1895 
1896 
1897 
1898 
1899 
1900 
1901   public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
1902       throws IOException {
1903     
1904     if (this.closing.get()) {
1905       String msg = "Skipping flush on " + this + " because closing";
1906       LOG.debug(msg);
1907       return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1908     }
1909     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1910     status.setStatus("Acquiring readlock on region");
1911     
1912     lock.readLock().lock();
1913     try {
1914       if (this.closed.get()) {
1915         String msg = "Skipping flush on " + this + " because closed";
1916         LOG.debug(msg);
1917         status.abort(msg);
1918         return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1919       }
1920       if (coprocessorHost != null) {
1921         status.setStatus("Running coprocessor pre-flush hooks");
1922         coprocessorHost.preFlush();
1923       }
1924       
1925       
1926       if (numMutationsWithoutWAL.get() > 0) {
1927         numMutationsWithoutWAL.set(0);
1928         dataInMemoryWithoutWAL.set(0);
1929       }
1930       synchronized (writestate) {
1931         if (!writestate.flushing && writestate.writesEnabled) {
1932           this.writestate.flushing = true;
1933         } else {
1934           if (LOG.isDebugEnabled()) {
1935             LOG.debug("NOT flushing memstore for region " + this
1936                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1937                 + writestate.writesEnabled);
1938           }
1939           String msg = "Not flushing since "
1940               + (writestate.flushing ? "already flushing"
1941               : "writes not enabled");
1942           status.abort(msg);
1943           return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1944         }
1945       }
1946 
1947       try {
1948         Collection<Store> specificStoresToFlush =
1949             forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
1950         FlushResult fs = internalFlushcache(specificStoresToFlush,
1951           status, writeFlushRequestWalMarker);
1952 
1953         if (coprocessorHost != null) {
1954           status.setStatus("Running post-flush coprocessor hooks");
1955           coprocessorHost.postFlush();
1956         }
1957 
1958         status.markComplete("Flush successful");
1959         return fs;
1960       } finally {
1961         synchronized (writestate) {
1962           writestate.flushing = false;
1963           this.writestate.flushRequested = false;
1964           writestate.notifyAll();
1965         }
1966       }
1967     } finally {
1968       lock.readLock().unlock();
1969       status.cleanup();
1970     }
1971   }
1972 
1973   
1974 
1975 
1976 
1977 
1978 
1979 
1980   boolean shouldFlushStore(Store store) {
1981     long maxFlushedSeqId =
1982         this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
1983             .getFamily().getName()) - 1;
1984     if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
1985       if (LOG.isDebugEnabled()) {
1986         LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
1987             + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
1988             + ") is far away from current(" + sequenceId.get() + "), max allowed is "
1989             + flushPerChanges);
1990       }
1991       return true;
1992     }
1993     if (flushCheckInterval <= 0) {
1994       return false;
1995     }
1996     long now = EnvironmentEdgeManager.currentTime();
1997     if (store.timeOfOldestEdit() < now - flushCheckInterval) {
1998       if (LOG.isDebugEnabled()) {
1999         LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
2000             + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
2001             + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
2002       }
2003       return true;
2004     }
2005     return false;
2006   }
2007 
2008   
2009 
2010 
2011   boolean shouldFlush() {
2012     
2013     if (this.maxFlushedSeqId > 0
2014           && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
2015       return true;
2016     }
2017     long modifiedFlushCheckInterval = flushCheckInterval;
2018     if (getRegionInfo().isMetaRegion() &&
2019         getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
2020       modifiedFlushCheckInterval = META_CACHE_FLUSH_INTERVAL;
2021     }
2022     if (modifiedFlushCheckInterval <= 0) { 
2023       return false;
2024     }
2025     long now = EnvironmentEdgeManager.currentTime();
2026     
2027     if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
2028       return false;
2029     }
2030     
2031     
2032     for (Store s : getStores()) {
2033       if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
2034         
2035         return true;
2036       }
2037     }
2038     return false;
2039   }
2040 
2041   
2042 
2043 
2044 
2045 
2046   private FlushResult internalFlushcache(MonitoredTask status)
2047       throws IOException {
2048     return internalFlushcache(stores.values(), status, false);
2049   }
2050 
2051   
2052 
2053 
2054 
2055 
2056   private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
2057       MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
2058     return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
2059         status, writeFlushWalMarker);
2060   }
2061 
2062   
2063 
2064 
2065 
2066 
2067 
2068 
2069 
2070 
2071 
2072 
2073 
2074 
2075 
2076 
2077 
2078 
2079 
2080 
2081 
2082 
2083 
2084 
2085 
2086 
2087 
2088 
2089 
2090   protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
2091       final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2092           throws IOException {
2093     PrepareFlushResult result
2094       = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
2095     if (result.result == null) {
2096       return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
2097     } else {
2098       return result.result; 
2099     }
2100   }
2101 
2102   protected PrepareFlushResult internalPrepareFlushCache(
2103       final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
2104       MonitoredTask status, boolean writeFlushWalMarker)
2105           throws IOException {
2106 
2107     if (this.rsServices != null && this.rsServices.isAborted()) {
2108       
2109       throw new IOException("Aborting flush because server is aborted...");
2110     }
2111     final long startTime = EnvironmentEdgeManager.currentTime();
2112     
2113     if (this.memstoreSize.get() <= 0) {
2114       
2115       
2116       MultiVersionConsistencyControl.WriteEntry writeEntry = null;
2117       this.updatesLock.writeLock().lock();
2118       try {
2119         if (this.memstoreSize.get() <= 0) {
2120           
2121           
2122           
2123           
2124           
2125           
2126           if (wal != null) {
2127             writeEntry = mvcc.beginMemstoreInsert();
2128             long flushOpSeqId = getNextSequenceId(wal);
2129             FlushResult flushResult = new FlushResultImpl(
2130               FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush",
2131               writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
2132             writeEntry.setWriteNumber(flushOpSeqId);
2133             mvcc.waitForPreviousTransactionsComplete(writeEntry);
2134             writeEntry = null;
2135             return new PrepareFlushResult(flushResult, myseqid);
2136           } else {
2137             return new PrepareFlushResult(
2138               new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2139                 "Nothing to flush", false),
2140               myseqid);
2141           }
2142         }
2143       } finally {
2144         this.updatesLock.writeLock().unlock();
2145         if (writeEntry != null) {
2146           mvcc.advanceMemstore(writeEntry);
2147         }
2148       }
2149     }
2150 
2151     if (LOG.isInfoEnabled()) {
2152       LOG.info("Started memstore flush for " + this + ", current region memstore size "
2153           + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
2154           + stores.size() + " column families' memstores are being flushed."
2155           + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
2156       
2157       if (this.stores.size() > storesToFlush.size()) {
2158         for (Store store: storesToFlush) {
2159           LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
2160               + " which was occupying "
2161               + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
2162         }
2163       }
2164     }
2165     
2166     
2167     
2168     
2169     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
2170     
2171     
2172     status.setStatus("Obtaining lock to block concurrent updates");
2173     
2174     this.updatesLock.writeLock().lock();
2175     status.setStatus("Preparing to flush by snapshotting stores in " +
2176       getRegionInfo().getEncodedName());
2177     long totalFlushableSizeOfFlushableStores = 0;
2178 
2179     Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
2180     for (Store store: storesToFlush) {
2181       flushedFamilyNames.add(store.getFamily().getName());
2182     }
2183 
2184     TreeMap<byte[], StoreFlushContext> storeFlushCtxs
2185       = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
2186     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
2187         Bytes.BYTES_COMPARATOR);
2188     TreeMap<byte[], Long> storeFlushableSize
2189         = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2190     
2191     
2192     long flushOpSeqId = HConstants.NO_SEQNUM;
2193     
2194     
2195     long flushedSeqId = HConstants.NO_SEQNUM;
2196     byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
2197 
2198     long trxId = 0;
2199     try {
2200       try {
2201         writeEntry = mvcc.beginMemstoreInsert();
2202         
2203         
2204         
2205         
2206         
2207         mvcc.waitForPreviousTransactionsComplete(writeEntry);
2208         
2209         writeEntry = null;
2210 
2211         if (wal != null) {
2212           Long earliestUnflushedSequenceIdForTheRegion =
2213               wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
2214           if (earliestUnflushedSequenceIdForTheRegion == null) {
2215             
2216             String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
2217             status.setStatus(msg);
2218             return new PrepareFlushResult(
2219               new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
2220               myseqid);
2221           }
2222           flushOpSeqId = getNextSequenceId(wal);
2223           
2224           flushedSeqId =
2225             earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
2226               flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
2227         } else {
2228           
2229           flushedSeqId = flushOpSeqId = myseqid;
2230         }
2231 
2232         for (Store s : storesToFlush) {
2233           totalFlushableSizeOfFlushableStores += s.getFlushableSize();
2234           storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
2235           committedFiles.put(s.getFamily().getName(), null); 
2236           storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
2237         }
2238 
2239         
2240         if (wal != null && !writestate.readOnly) {
2241           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
2242             getRegionInfo(), flushOpSeqId, committedFiles);
2243           
2244           trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2245             desc, sequenceId, false);
2246         }
2247 
2248         
2249         for (StoreFlushContext flush : storeFlushCtxs.values()) {
2250           flush.prepare();
2251         }
2252       } catch (IOException ex) {
2253         if (wal != null) {
2254           if (trxId > 0) { 
2255             try {
2256               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2257                 getRegionInfo(), flushOpSeqId, committedFiles);
2258               WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2259                 desc, sequenceId, false);
2260             } catch (Throwable t) {
2261               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2262                   StringUtils.stringifyException(t));
2263               
2264             }
2265           }
2266           
2267           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2268           throw ex; 
2269         }
2270       } finally {
2271         this.updatesLock.writeLock().unlock();
2272       }
2273       String s = "Finished memstore snapshotting " + this +
2274         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2275       status.setStatus(s);
2276       if (LOG.isTraceEnabled()) LOG.trace(s);
2277       
2278       
2279       if (wal != null) {
2280         try {
2281           wal.sync(); 
2282         } catch (IOException ioe) {
2283           wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2284           throw ioe;
2285         }
2286       }
2287     } finally {
2288       if (writeEntry != null) {
2289         
2290         mvcc.advanceMemstore(writeEntry);
2291       }
2292     }
2293     return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId,
2294       flushedSeqId, totalFlushableSizeOfFlushableStores);
2295   }
2296 
2297   
2298 
2299 
2300 
2301 
2302 
2303   private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
2304     if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
2305       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
2306         getRegionInfo(), -1, new TreeMap<byte[], List<Path>>());
2307       try {
2308         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2309           desc, sequenceId, true);
2310         return true;
2311       } catch (IOException e) {
2312         LOG.warn(getRegionInfo().getEncodedName() + " : "
2313             + "Received exception while trying to write the flush request to wal", e);
2314       }
2315     }
2316     return false;
2317   }
2318 
2319   protected FlushResult internalFlushCacheAndCommit(
2320         final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
2321         final Collection<Store> storesToFlush)
2322     throws IOException {
2323 
2324     
2325     TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
2326     TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
2327     long startTime = prepareResult.startTime;
2328     long flushOpSeqId = prepareResult.flushOpSeqId;
2329     long flushedSeqId = prepareResult.flushedSeqId;
2330     long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
2331 
2332     String s = "Flushing stores of " + this;
2333     status.setStatus(s);
2334     if (LOG.isTraceEnabled()) LOG.trace(s);
2335 
2336     
2337     
2338     
2339     
2340     boolean compactionRequested = false;
2341     try {
2342       
2343       
2344       
2345       
2346 
2347       for (StoreFlushContext flush : storeFlushCtxs.values()) {
2348         flush.flushCache(status);
2349       }
2350 
2351       
2352       
2353       Iterator<Store> it = storesToFlush.iterator();
2354       
2355       for (StoreFlushContext flush : storeFlushCtxs.values()) {
2356         boolean needsCompaction = flush.commit(status);
2357         if (needsCompaction) {
2358           compactionRequested = true;
2359         }
2360         byte[] storeName = it.next().getFamily().getName();
2361         List<Path> storeCommittedFiles = flush.getCommittedFiles();
2362         committedFiles.put(storeName, storeCommittedFiles);
2363         
2364         if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
2365           totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
2366         }
2367       }
2368       storeFlushCtxs.clear();
2369 
2370       
2371       this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2372 
2373       if (wal != null) {
2374         
2375         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2376           getRegionInfo(), flushOpSeqId, committedFiles);
2377         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2378           desc, sequenceId, true);
2379       }
2380     } catch (Throwable t) {
2381       
2382       
2383       
2384       
2385       
2386       
2387       if (wal != null) {
2388         try {
2389           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2390             getRegionInfo(), flushOpSeqId, committedFiles);
2391           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2392             desc, sequenceId, false);
2393         } catch (Throwable ex) {
2394           LOG.warn(getRegionInfo().getEncodedName() + " : "
2395               + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:"
2396               + StringUtils.stringifyException(ex));
2397           
2398         }
2399         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2400       }
2401       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2402           Bytes.toStringBinary(getRegionInfo().getRegionName()));
2403       dse.initCause(t);
2404       status.abort("Flush failed: " + StringUtils.stringifyException(t));
2405 
2406       
2407       
2408       
2409       
2410       this.closing.set(true);
2411 
2412       if (rsServices != null) {
2413         
2414         rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
2415       }
2416 
2417       throw dse;
2418     }
2419 
2420     
2421     if (wal != null) {
2422       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2423     }
2424 
2425     
2426     for (Store store: storesToFlush) {
2427       this.lastStoreFlushTimeMap.put(store, startTime);
2428     }
2429 
2430     
2431     this.maxFlushedSeqId = flushedSeqId;
2432 
2433     
2434     this.lastFlushOpSeqId = flushOpSeqId;
2435 
2436     
2437     
2438     synchronized (this) {
2439       notifyAll(); 
2440     }
2441 
2442     long time = EnvironmentEdgeManager.currentTime() - startTime;
2443     long memstoresize = this.memstoreSize.get();
2444     String msg = "Finished memstore flush of ~"
2445         + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2446         + totalFlushableSizeOfFlushableStores + ", currentsize="
2447         + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2448         + " for region " + this + " in " + time + "ms, sequenceid="
2449         + flushOpSeqId +  ", compaction requested=" + compactionRequested
2450         + ((wal == null) ? "; wal=null" : "");
2451     LOG.info(msg);
2452     status.setStatus(msg);
2453 
2454     return new FlushResultImpl(compactionRequested ?
2455         FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2456           FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
2457         flushOpSeqId);
2458   }
2459 
2460   
2461 
2462 
2463 
2464 
2465   @VisibleForTesting
2466   protected long getNextSequenceId(final WAL wal) throws IOException {
2467     
2468     
2469     
2470     
2471     
2472     WALKey key = this.appendEmptyEdit(wal, null);
2473     return key.getSequenceId(maxWaitForSeqId);
2474   }
2475 
2476   
2477   
2478   
2479 
2480   @Override
2481   public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException {
2482     if (coprocessorHost != null) {
2483       Result result = new Result();
2484       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2485         return result;
2486       }
2487     }
2488     
2489     
2490     checkRow(row, "getClosestRowBefore");
2491     startRegionOperation(Operation.GET);
2492     this.readRequestsCount.increment();
2493     try {
2494       Store store = getStore(family);
2495       
2496       Cell key = store.getRowKeyAtOrBefore(row);
2497       Result result = null;
2498       if (key != null) {
2499         Get get = new Get(CellUtil.cloneRow(key));
2500         get.addFamily(family);
2501         result = get(get);
2502       }
2503       if (coprocessorHost != null) {
2504         coprocessorHost.postGetClosestRowBefore(row, family, result);
2505       }
2506       return result;
2507     } finally {
2508       closeRegionOperation(Operation.GET);
2509     }
2510   }
2511 
2512   @Override
2513   public RegionScanner getScanner(Scan scan) throws IOException {
2514    return getScanner(scan, null);
2515   }
2516 
2517   protected RegionScanner getScanner(Scan scan,
2518       List<KeyValueScanner> additionalScanners) throws IOException {
2519     startRegionOperation(Operation.SCAN);
2520     try {
2521       
2522       if (!scan.hasFamilies()) {
2523         
2524         for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
2525           scan.addFamily(family);
2526         }
2527       } else {
2528         for (byte [] family : scan.getFamilyMap().keySet()) {
2529           checkFamily(family);
2530         }
2531       }
2532       return instantiateRegionScanner(scan, additionalScanners);
2533     } finally {
2534       closeRegionOperation(Operation.SCAN);
2535     }
2536   }
2537 
2538   protected RegionScanner instantiateRegionScanner(Scan scan,
2539       List<KeyValueScanner> additionalScanners) throws IOException {
2540     if (scan.isReversed()) {
2541       if (scan.getFilter() != null) {
2542         scan.getFilter().setReversed(true);
2543       }
2544       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2545     }
2546     return new RegionScannerImpl(scan, additionalScanners, this);
2547   }
2548 
2549   @Override
2550   public void prepareDelete(Delete delete) throws IOException {
2551     
2552     if(delete.getFamilyCellMap().isEmpty()){
2553       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2554         
2555         delete.addFamily(family, delete.getTimeStamp());
2556       }
2557     } else {
2558       for(byte [] family : delete.getFamilyCellMap().keySet()) {
2559         if(family == null) {
2560           throw new NoSuchColumnFamilyException("Empty family is invalid");
2561         }
2562         checkFamily(family);
2563       }
2564     }
2565   }
2566 
2567   @Override
2568   public void delete(Delete delete) throws IOException {
2569     checkReadOnly();
2570     checkResources();
2571     startRegionOperation(Operation.DELETE);
2572     try {
2573       delete.getRow();
2574       
2575       doBatchMutate(delete);
2576     } finally {
2577       closeRegionOperation(Operation.DELETE);
2578     }
2579   }
2580 
2581   
2582 
2583 
2584   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2585 
2586   
2587 
2588 
2589 
2590 
2591   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2592       Durability durability) throws IOException {
2593     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2594     delete.setFamilyCellMap(familyMap);
2595     delete.setDurability(durability);
2596     doBatchMutate(delete);
2597   }
2598 
2599   @Override
2600   public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2601       byte[] byteNow) throws IOException {
2602     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2603 
2604       byte[] family = e.getKey();
2605       List<Cell> cells = e.getValue();
2606       assert cells instanceof RandomAccess;
2607 
2608       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2609       int listSize = cells.size();
2610       for (int i=0; i < listSize; i++) {
2611         Cell cell = cells.get(i);
2612         
2613         
2614         if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2615           byte[] qual = CellUtil.cloneQualifier(cell);
2616           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2617 
2618           Integer count = kvCount.get(qual);
2619           if (count == null) {
2620             kvCount.put(qual, 1);
2621           } else {
2622             kvCount.put(qual, count + 1);
2623           }
2624           count = kvCount.get(qual);
2625 
2626           Get get = new Get(CellUtil.cloneRow(cell));
2627           get.setMaxVersions(count);
2628           get.addColumn(family, qual);
2629           if (coprocessorHost != null) {
2630             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2631                 byteNow, get)) {
2632               updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2633             }
2634           } else {
2635             updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2636           }
2637         } else {
2638           CellUtil.updateLatestStamp(cell, byteNow, 0);
2639         }
2640       }
2641     }
2642   }
2643 
2644   void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2645       throws IOException {
2646     List<Cell> result = get(get, false);
2647 
2648     if (result.size() < count) {
2649       
2650       CellUtil.updateLatestStamp(cell, byteNow, 0);
2651       return;
2652     }
2653     if (result.size() > count) {
2654       throw new RuntimeException("Unexpected size: " + result.size());
2655     }
2656     Cell getCell = result.get(count - 1);
2657     CellUtil.setTimestamp(cell, getCell.getTimestamp());
2658   }
2659 
2660   @Override
2661   public void put(Put put) throws IOException {
2662     checkReadOnly();
2663 
2664     
2665     
2666     
2667     
2668     checkResources();
2669     startRegionOperation(Operation.PUT);
2670     try {
2671       
2672       doBatchMutate(put);
2673     } finally {
2674       closeRegionOperation(Operation.PUT);
2675     }
2676   }
2677 
2678   
2679 
2680 
2681 
2682 
2683   private abstract static class BatchOperationInProgress<T> {
2684     T[] operations;
2685     int nextIndexToProcess = 0;
2686     OperationStatus[] retCodeDetails;
2687     WALEdit[] walEditsFromCoprocessors;
2688 
2689     public BatchOperationInProgress(T[] operations) {
2690       this.operations = operations;
2691       this.retCodeDetails = new OperationStatus[operations.length];
2692       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2693       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2694     }
2695 
2696     public abstract Mutation getMutation(int index);
2697     public abstract long getNonceGroup(int index);
2698     public abstract long getNonce(int index);
2699     
2700     public abstract Mutation[] getMutationsForCoprocs();
2701     public abstract boolean isInReplay();
2702     public abstract long getReplaySequenceId();
2703 
2704     public boolean isDone() {
2705       return nextIndexToProcess == operations.length;
2706     }
2707   }
2708 
2709   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2710     private long nonceGroup;
2711     private long nonce;
2712     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2713       super(operations);
2714       this.nonceGroup = nonceGroup;
2715       this.nonce = nonce;
2716     }
2717 
2718     @Override
2719     public Mutation getMutation(int index) {
2720       return this.operations[index];
2721     }
2722 
2723     @Override
2724     public long getNonceGroup(int index) {
2725       return nonceGroup;
2726     }
2727 
2728     @Override
2729     public long getNonce(int index) {
2730       return nonce;
2731     }
2732 
2733     @Override
2734     public Mutation[] getMutationsForCoprocs() {
2735       return this.operations;
2736     }
2737 
2738     @Override
2739     public boolean isInReplay() {
2740       return false;
2741     }
2742 
2743     @Override
2744     public long getReplaySequenceId() {
2745       return 0;
2746     }
2747   }
2748 
2749   private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2750     private long replaySeqId = 0;
2751     public ReplayBatch(MutationReplay[] operations, long seqId) {
2752       super(operations);
2753       this.replaySeqId = seqId;
2754     }
2755 
2756     @Override
2757     public Mutation getMutation(int index) {
2758       return this.operations[index].mutation;
2759     }
2760 
2761     @Override
2762     public long getNonceGroup(int index) {
2763       return this.operations[index].nonceGroup;
2764     }
2765 
2766     @Override
2767     public long getNonce(int index) {
2768       return this.operations[index].nonce;
2769     }
2770 
2771     @Override
2772     public Mutation[] getMutationsForCoprocs() {
2773       assert false;
2774       throw new RuntimeException("Should not be called for replay batch");
2775     }
2776 
2777     @Override
2778     public boolean isInReplay() {
2779       return true;
2780     }
2781 
2782     @Override
2783     public long getReplaySequenceId() {
2784       return this.replaySeqId;
2785     }
2786   }
2787 
2788   @Override
2789   public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
2790       throws IOException {
2791     
2792     
2793     
2794     
2795     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2796   }
2797 
2798   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2799     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2800   }
2801 
2802   @Override
2803   public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2804       throws IOException {
2805     if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
2806         && replaySeqId < lastReplayedOpenRegionSeqId) {
2807       
2808       
2809       if (LOG.isTraceEnabled()) {
2810         LOG.trace(getRegionInfo().getEncodedName() + " : "
2811           + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
2812           + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
2813         for (MutationReplay mut : mutations) {
2814           LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
2815         }
2816       }
2817 
2818       OperationStatus[] statuses = new OperationStatus[mutations.length];
2819       for (int i = 0; i < statuses.length; i++) {
2820         statuses[i] = OperationStatus.SUCCESS;
2821       }
2822       return statuses;
2823     }
2824     return batchMutate(new ReplayBatch(mutations, replaySeqId));
2825   }
2826 
2827   
2828 
2829 
2830 
2831 
2832 
2833 
2834 
2835   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2836     boolean initialized = false;
2837     Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2838     startRegionOperation(op);
2839     try {
2840       while (!batchOp.isDone()) {
2841         if (!batchOp.isInReplay()) {
2842           checkReadOnly();
2843         }
2844         checkResources();
2845 
2846         if (!initialized) {
2847           this.writeRequestsCount.add(batchOp.operations.length);
2848           if (!batchOp.isInReplay()) {
2849             doPreMutationHook(batchOp);
2850           }
2851           initialized = true;
2852         }
2853         doMiniBatchMutation(batchOp);
2854         long newSize = this.getMemstoreSize();
2855         if (isFlushSize(newSize)) {
2856           requestFlush();
2857         }
2858       }
2859     } finally {
2860       closeRegionOperation(op);
2861     }
2862     return batchOp.retCodeDetails;
2863   }
2864 
2865 
2866   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2867       throws IOException {
2868     
2869     WALEdit walEdit = new WALEdit();
2870     if (coprocessorHost != null) {
2871       for (int i = 0 ; i < batchOp.operations.length; i++) {
2872         Mutation m = batchOp.getMutation(i);
2873         if (m instanceof Put) {
2874           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2875             
2876             
2877             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2878           }
2879         } else if (m instanceof Delete) {
2880           Delete curDel = (Delete) m;
2881           if (curDel.getFamilyCellMap().isEmpty()) {
2882             
2883             prepareDelete(curDel);
2884           }
2885           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2886             
2887             
2888             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2889           }
2890         } else {
2891           
2892           
2893           
2894           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2895               "Put/Delete mutations only supported in batchMutate() now");
2896         }
2897         if (!walEdit.isEmpty()) {
2898           batchOp.walEditsFromCoprocessors[i] = walEdit;
2899           walEdit = new WALEdit();
2900         }
2901       }
2902     }
2903   }
2904 
2905   @SuppressWarnings("unchecked")
2906   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2907     boolean isInReplay = batchOp.isInReplay();
2908     
2909     boolean putsCfSetConsistent = true;
2910     
2911     Set<byte[]> putsCfSet = null;
2912     
2913     boolean deletesCfSetConsistent = true;
2914     
2915     Set<byte[]> deletesCfSet = null;
2916 
2917     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2918     WALEdit walEdit = new WALEdit(isInReplay);
2919     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
2920     long txid = 0;
2921     boolean doRollBackMemstore = false;
2922     boolean locked = false;
2923 
2924     
2925     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2926     
2927     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2928     List<Cell> memstoreCells = new ArrayList<Cell>();
2929     
2930     int firstIndex = batchOp.nextIndexToProcess;
2931     int lastIndexExclusive = firstIndex;
2932     boolean success = false;
2933     int noOfPuts = 0, noOfDeletes = 0;
2934     WALKey walKey = null;
2935     long mvccNum = 0;
2936     long addedSize = 0;
2937     try {
2938       
2939       
2940       
2941       
2942       int numReadyToWrite = 0;
2943       long now = EnvironmentEdgeManager.currentTime();
2944       while (lastIndexExclusive < batchOp.operations.length) {
2945         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2946         boolean isPutMutation = mutation instanceof Put;
2947 
2948         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2949         
2950         familyMaps[lastIndexExclusive] = familyMap;
2951 
2952         
2953         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2954             != OperationStatusCode.NOT_RUN) {
2955           lastIndexExclusive++;
2956           continue;
2957         }
2958 
2959         try {
2960           if (isPutMutation) {
2961             
2962             if (isInReplay) {
2963               removeNonExistentColumnFamilyForReplay(familyMap);
2964             } else {
2965               checkFamilies(familyMap.keySet());
2966             }
2967             checkTimestamps(mutation.getFamilyCellMap(), now);
2968           } else {
2969             prepareDelete((Delete) mutation);
2970           }
2971           checkRow(mutation.getRow(), "doMiniBatchMutation");
2972         } catch (NoSuchColumnFamilyException nscf) {
2973           LOG.warn("No such column family in batch mutation", nscf);
2974           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2975               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2976           lastIndexExclusive++;
2977           continue;
2978         } catch (FailedSanityCheckException fsce) {
2979           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2980           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2981               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2982           lastIndexExclusive++;
2983           continue;
2984         } catch (WrongRegionException we) {
2985           LOG.warn("Batch mutation had a row that does not belong to this region", we);
2986           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2987               OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
2988           lastIndexExclusive++;
2989           continue;
2990         }
2991 
2992         
2993         
2994         boolean shouldBlock = numReadyToWrite == 0;
2995         RowLock rowLock = null;
2996         try {
2997           rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
2998         } catch (IOException ioe) {
2999           LOG.warn("Failed getting lock in batch put, row="
3000             + Bytes.toStringBinary(mutation.getRow()), ioe);
3001         }
3002         if (rowLock == null) {
3003           
3004           break; 
3005         } else {
3006           acquiredRowLocks.add(rowLock);
3007         }
3008 
3009         lastIndexExclusive++;
3010         numReadyToWrite++;
3011 
3012         if (isPutMutation) {
3013           
3014           
3015           
3016           if (putsCfSet == null) {
3017             putsCfSet = mutation.getFamilyCellMap().keySet();
3018           } else {
3019             putsCfSetConsistent = putsCfSetConsistent
3020                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
3021           }
3022         } else {
3023           if (deletesCfSet == null) {
3024             deletesCfSet = mutation.getFamilyCellMap().keySet();
3025           } else {
3026             deletesCfSetConsistent = deletesCfSetConsistent
3027                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
3028           }
3029         }
3030       }
3031 
3032       
3033       
3034       now = EnvironmentEdgeManager.currentTime();
3035       byte[] byteNow = Bytes.toBytes(now);
3036 
3037       
3038       if (numReadyToWrite <= 0) return 0L;
3039 
3040       
3041 
3042       
3043       
3044       
3045       for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
3046         
3047         if (batchOp.retCodeDetails[i].getOperationStatusCode()
3048             != OperationStatusCode.NOT_RUN) continue;
3049 
3050         Mutation mutation = batchOp.getMutation(i);
3051         if (mutation instanceof Put) {
3052           updateCellTimestamps(familyMaps[i].values(), byteNow);
3053           noOfPuts++;
3054         } else {
3055           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
3056           noOfDeletes++;
3057         }
3058         rewriteCellTags(familyMaps[i], mutation);
3059       }
3060 
3061       lock(this.updatesLock.readLock(), numReadyToWrite);
3062       locked = true;
3063       if(isInReplay) {
3064         mvccNum = batchOp.getReplaySequenceId();
3065       } else {
3066         mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
3067       }
3068       
3069       
3070       
3071       
3072       writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
3073 
3074       
3075       if (!isInReplay && coprocessorHost != null) {
3076         MiniBatchOperationInProgress<Mutation> miniBatchOp =
3077           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3078           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3079         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
3080       }
3081 
3082       
3083       
3084       
3085       
3086       
3087       
3088       
3089       
3090       
3091       for (int i = firstIndex; i < lastIndexExclusive; i++) {
3092         if (batchOp.retCodeDetails[i].getOperationStatusCode()
3093             != OperationStatusCode.NOT_RUN) {
3094           continue;
3095         }
3096         doRollBackMemstore = true; 
3097         addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
3098       }
3099 
3100       
3101       
3102       
3103       Durability durability = Durability.USE_DEFAULT;
3104       for (int i = firstIndex; i < lastIndexExclusive; i++) {
3105         
3106         if (batchOp.retCodeDetails[i].getOperationStatusCode()
3107             != OperationStatusCode.NOT_RUN) {
3108           continue;
3109         }
3110         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
3111 
3112         Mutation m = batchOp.getMutation(i);
3113         Durability tmpDur = getEffectiveDurability(m.getDurability());
3114         if (tmpDur.ordinal() > durability.ordinal()) {
3115           durability = tmpDur;
3116         }
3117         if (tmpDur == Durability.SKIP_WAL) {
3118           recordMutationWithoutWal(m.getFamilyCellMap());
3119           continue;
3120         }
3121 
3122         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
3123         
3124         
3125         
3126         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
3127           if (walEdit.size() > 0) {
3128             assert isInReplay;
3129             if (!isInReplay) {
3130               throw new IOException("Multiple nonces per batch and not in replay");
3131             }
3132             
3133             
3134             walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3135               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
3136               currentNonceGroup, currentNonce);
3137             txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
3138               walEdit, getSequenceId(), true, null);
3139             walEdit = new WALEdit(isInReplay);
3140             walKey = null;
3141           }
3142           currentNonceGroup = nonceGroup;
3143           currentNonce = nonce;
3144         }
3145 
3146         
3147         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
3148         if (fromCP != null) {
3149           for (Cell cell : fromCP.getCells()) {
3150             walEdit.add(cell);
3151           }
3152         }
3153         addFamilyMapToWALEdit(familyMaps[i], walEdit);
3154       }
3155 
3156       
3157       
3158       
3159       Mutation mutation = batchOp.getMutation(firstIndex);
3160       if (isInReplay) {
3161         
3162         walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3163           this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3164           mutation.getClusterIds(), currentNonceGroup, currentNonce);
3165         long replaySeqId = batchOp.getReplaySequenceId();
3166         walKey.setOrigLogSeqNum(replaySeqId);
3167 
3168         
3169         while (true) {
3170           long seqId = getSequenceId().get();
3171           if (seqId >= replaySeqId) break;
3172           if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
3173         }
3174       }
3175       if (walEdit.size() > 0) {
3176         if (!isInReplay) {
3177         
3178         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3179             this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3180             mutation.getClusterIds(), currentNonceGroup, currentNonce);
3181         }
3182 
3183         txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
3184           getSequenceId(), true, memstoreCells);
3185       }
3186       if (walKey == null){
3187         
3188         walKey = this.appendEmptyEdit(this.wal, memstoreCells);
3189       }
3190 
3191       
3192       
3193       
3194       if (locked) {
3195         this.updatesLock.readLock().unlock();
3196         locked = false;
3197       }
3198       releaseRowLocks(acquiredRowLocks);
3199 
3200       
3201       
3202       
3203       if (txid != 0) {
3204         syncOrDefer(txid, durability);
3205       }
3206 
3207       doRollBackMemstore = false;
3208       
3209       if (!isInReplay && coprocessorHost != null) {
3210         MiniBatchOperationInProgress<Mutation> miniBatchOp =
3211           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3212           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3213         coprocessorHost.postBatchMutate(miniBatchOp);
3214       }
3215 
3216 
3217       
3218       
3219       
3220       if (writeEntry != null) {
3221         mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
3222         writeEntry = null;
3223       }
3224 
3225       
3226       
3227       
3228       
3229       if (!isInReplay && coprocessorHost != null) {
3230         for (int i = firstIndex; i < lastIndexExclusive; i++) {
3231           
3232           if (batchOp.retCodeDetails[i].getOperationStatusCode()
3233               != OperationStatusCode.SUCCESS) {
3234             continue;
3235           }
3236           Mutation m = batchOp.getMutation(i);
3237           if (m instanceof Put) {
3238             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
3239           } else {
3240             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
3241           }
3242         }
3243       }
3244 
3245       success = true;
3246       return addedSize;
3247     } finally {
3248       
3249       if (doRollBackMemstore) {
3250         rollbackMemstore(memstoreCells);
3251         if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
3252       } else {
3253         this.addAndGetGlobalMemstoreSize(addedSize);
3254         if (writeEntry != null) {
3255           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
3256         }
3257       }
3258 
3259       if (locked) {
3260         this.updatesLock.readLock().unlock();
3261       }
3262       releaseRowLocks(acquiredRowLocks);
3263 
3264       
3265       
3266       
3267       
3268       
3269 
3270       if (noOfPuts > 0) {
3271         
3272         if (this.metricsRegion != null) {
3273           this.metricsRegion.updatePut();
3274         }
3275       }
3276       if (noOfDeletes > 0) {
3277         
3278         if (this.metricsRegion != null) {
3279           this.metricsRegion.updateDelete();
3280         }
3281       }
3282       if (!success) {
3283         for (int i = firstIndex; i < lastIndexExclusive; i++) {
3284           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
3285             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3286           }
3287         }
3288       }
3289       if (coprocessorHost != null && !batchOp.isInReplay()) {
3290         
3291         
3292         MiniBatchOperationInProgress<Mutation> miniBatchOp =
3293             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3294                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
3295                 lastIndexExclusive);
3296         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3297       }
3298 
3299       batchOp.nextIndexToProcess = lastIndexExclusive;
3300     }
3301   }
3302 
3303   
3304 
3305 
3306 
3307   protected Durability getEffectiveDurability(Durability d) {
3308     return d == Durability.USE_DEFAULT ? this.durability : d;
3309   }
3310 
3311   
3312   
3313   
3314   
3315 
3316   @Override
3317   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3318       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3319       boolean writeToWAL)
3320   throws IOException{
3321     checkReadOnly();
3322     
3323     
3324     checkResources();
3325     boolean isPut = w instanceof Put;
3326     if (!isPut && !(w instanceof Delete))
3327       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3328           "be Put or Delete");
3329     if (!Bytes.equals(row, w.getRow())) {
3330       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3331           "getRow must match the passed row");
3332     }
3333 
3334     startRegionOperation();
3335     try {
3336       Get get = new Get(row);
3337       checkFamily(family);
3338       get.addColumn(family, qualifier);
3339 
3340       
3341       RowLock rowLock = getRowLock(get.getRow());
3342       
3343       mvcc.waitForPreviousTransactionsComplete();
3344       try {
3345         if (this.getCoprocessorHost() != null) {
3346           Boolean processed = null;
3347           if (w instanceof Put) {
3348             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3349                 qualifier, compareOp, comparator, (Put) w);
3350           } else if (w instanceof Delete) {
3351             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3352                 qualifier, compareOp, comparator, (Delete) w);
3353           }
3354           if (processed != null) {
3355             return processed;
3356           }
3357         }
3358         List<Cell> result = get(get, false);
3359 
3360         boolean valueIsNull = comparator.getValue() == null ||
3361           comparator.getValue().length == 0;
3362         boolean matches = false;
3363         long cellTs = 0;
3364         if (result.size() == 0 && valueIsNull) {
3365           matches = true;
3366         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3367             valueIsNull) {
3368           matches = true;
3369           cellTs = result.get(0).getTimestamp();
3370         } else if (result.size() == 1 && !valueIsNull) {
3371           Cell kv = result.get(0);
3372           cellTs = kv.getTimestamp();
3373           int compareResult = comparator.compareTo(kv.getValueArray(),
3374               kv.getValueOffset(), kv.getValueLength());
3375           switch (compareOp) {
3376           case LESS:
3377             matches = compareResult < 0;
3378             break;
3379           case LESS_OR_EQUAL:
3380             matches = compareResult <= 0;
3381             break;
3382           case EQUAL:
3383             matches = compareResult == 0;
3384             break;
3385           case NOT_EQUAL:
3386             matches = compareResult != 0;
3387             break;
3388           case GREATER_OR_EQUAL:
3389             matches = compareResult >= 0;
3390             break;
3391           case GREATER:
3392             matches = compareResult > 0;
3393             break;
3394           default:
3395             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3396           }
3397         }
3398         
3399         if (matches) {
3400           
3401           
3402           
3403           
3404           long now = EnvironmentEdgeManager.currentTime();
3405           long ts = Math.max(now, cellTs); 
3406           byte[] byteTs = Bytes.toBytes(ts);
3407 
3408           if (w instanceof Put) {
3409             updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3410           }
3411           
3412           
3413 
3414           
3415           
3416           doBatchMutate(w);
3417           this.checkAndMutateChecksPassed.increment();
3418           return true;
3419         }
3420         this.checkAndMutateChecksFailed.increment();
3421         return false;
3422       } finally {
3423         rowLock.release();
3424       }
3425     } finally {
3426       closeRegionOperation();
3427     }
3428   }
3429 
3430   
3431   
3432   
3433   
3434 
3435   @Override
3436   public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3437       CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3438       boolean writeToWAL) throws IOException {
3439     checkReadOnly();
3440     
3441     
3442     checkResources();
3443 
3444     startRegionOperation();
3445     try {
3446       Get get = new Get(row);
3447       checkFamily(family);
3448       get.addColumn(family, qualifier);
3449 
3450       
3451       RowLock rowLock = getRowLock(get.getRow());
3452       
3453       mvcc.waitForPreviousTransactionsComplete();
3454       try {
3455         List<Cell> result = get(get, false);
3456 
3457         boolean valueIsNull = comparator.getValue() == null ||
3458             comparator.getValue().length == 0;
3459         boolean matches = false;
3460         long cellTs = 0;
3461         if (result.size() == 0 && valueIsNull) {
3462           matches = true;
3463         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3464             valueIsNull) {
3465           matches = true;
3466           cellTs = result.get(0).getTimestamp();
3467         } else if (result.size() == 1 && !valueIsNull) {
3468           Cell kv = result.get(0);
3469           cellTs = kv.getTimestamp();
3470           int compareResult = comparator.compareTo(kv.getValueArray(),
3471               kv.getValueOffset(), kv.getValueLength());
3472           switch (compareOp) {
3473           case LESS:
3474             matches = compareResult < 0;
3475             break;
3476           case LESS_OR_EQUAL:
3477             matches = compareResult <= 0;
3478             break;
3479           case EQUAL:
3480             matches = compareResult == 0;
3481             break;
3482           case NOT_EQUAL:
3483             matches = compareResult != 0;
3484             break;
3485           case GREATER_OR_EQUAL:
3486             matches = compareResult >= 0;
3487             break;
3488           case GREATER:
3489             matches = compareResult > 0;
3490             break;
3491           default:
3492             throw new RuntimeException("Unknown Compare op " + compareOp.name());
3493           }
3494         }
3495         
3496         if (matches) {
3497           
3498           
3499           
3500           
3501           long now = EnvironmentEdgeManager.currentTime();
3502           long ts = Math.max(now, cellTs); 
3503           byte[] byteTs = Bytes.toBytes(ts);
3504 
3505           for (Mutation w : rm.getMutations()) {
3506             if (w instanceof Put) {
3507               updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3508             }
3509             
3510             
3511           }
3512 
3513           
3514           
3515           mutateRow(rm);
3516           this.checkAndMutateChecksPassed.increment();
3517           return true;
3518         }
3519         this.checkAndMutateChecksFailed.increment();
3520         return false;
3521       } finally {
3522         rowLock.release();
3523       }
3524     } finally {
3525       closeRegionOperation();
3526     }
3527   }
3528 
3529   private void doBatchMutate(Mutation mutation) throws IOException {
3530     
3531     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
3532     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3533       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3534     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3535       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3536     }
3537   }
3538 
3539   
3540 
3541 
3542 
3543 
3544 
3545 
3546 
3547 
3548 
3549 
3550 
3551 
3552   public void addRegionToSnapshot(SnapshotDescription desc,
3553       ForeignExceptionSnare exnSnare) throws IOException {
3554     Path rootDir = FSUtils.getRootDir(conf);
3555     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3556 
3557     SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3558                                                         snapshotDir, desc, exnSnare);
3559     manifest.addRegion(this);
3560   }
3561 
3562   @Override
3563   public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3564       throws IOException {
3565     for (List<Cell> cells: cellItr) {
3566       if (cells == null) continue;
3567       assert cells instanceof RandomAccess;
3568       int listSize = cells.size();
3569       for (int i = 0; i < listSize; i++) {
3570         CellUtil.updateLatestStamp(cells.get(i), now, 0);
3571       }
3572     }
3573   }
3574 
3575   
3576 
3577 
3578   void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3579     
3580     
3581 
3582     if (m.getTTL() == Long.MAX_VALUE) {
3583       return;
3584     }
3585 
3586     
3587 
3588     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3589       List<Cell> cells = e.getValue();
3590       assert cells instanceof RandomAccess;
3591       int listSize = cells.size();
3592       for (int i = 0; i < listSize; i++) {
3593         Cell cell = cells.get(i);
3594         List<Tag> newTags = Tag.carryForwardTags(null, cell);
3595         newTags = carryForwardTTLTag(newTags, m);
3596 
3597         
3598         cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3599           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3600           cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3601           cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3602           cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3603           newTags));
3604       }
3605     }
3606   }
3607 
3608   
3609 
3610 
3611 
3612 
3613 
3614   private void checkResources() throws RegionTooBusyException {
3615     
3616     if (this.getRegionInfo().isMetaRegion()) return;
3617 
3618     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3619       blockedRequestsCount.increment();
3620       requestFlush();
3621       throw new RegionTooBusyException("Above memstore limit, " +
3622           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3623           this.getRegionInfo().getRegionNameAsString()) +
3624           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3625           this.getRegionServerServices().getServerName()) +
3626           ", memstoreSize=" + memstoreSize.get() +
3627           ", blockingMemStoreSize=" + blockingMemStoreSize);
3628     }
3629   }
3630 
3631   
3632 
3633 
3634   protected void checkReadOnly() throws IOException {
3635     if (isReadOnly()) {
3636       throw new DoNotRetryIOException("region is read only");
3637     }
3638   }
3639 
3640   protected void checkReadsEnabled() throws IOException {
3641     if (!this.writestate.readsEnabled) {
3642       throw new IOException(getRegionInfo().getEncodedName()
3643         + ": The region's reads are disabled. Cannot serve the request");
3644     }
3645   }
3646 
3647   public void setReadsEnabled(boolean readsEnabled) {
3648    if (readsEnabled && !this.writestate.readsEnabled) {
3649      LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
3650     }
3651     this.writestate.setReadsEnabled(readsEnabled);
3652   }
3653 
3654   
3655 
3656 
3657 
3658 
3659 
3660   private void put(final byte [] row, byte [] family, List<Cell> edits)
3661   throws IOException {
3662     NavigableMap<byte[], List<Cell>> familyMap;
3663     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3664 
3665     familyMap.put(family, edits);
3666     Put p = new Put(row);
3667     p.setFamilyCellMap(familyMap);
3668     doBatchMutate(p);
3669   }
3670 
3671   
3672 
3673 
3674 
3675 
3676 
3677 
3678 
3679 
3680 
3681 
3682 
3683 
3684 
3685 
3686   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3687     long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
3688     long size = 0;
3689 
3690     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3691       byte[] family = e.getKey();
3692       List<Cell> cells = e.getValue();
3693       assert cells instanceof RandomAccess;
3694       Store store = getStore(family);
3695       int listSize = cells.size();
3696       for (int i=0; i < listSize; i++) {
3697         Cell cell = cells.get(i);
3698         CellUtil.setSequenceId(cell, mvccNum);
3699         Pair<Long, Cell> ret = store.add(cell);
3700         size += ret.getFirst();
3701         memstoreCells.add(ret.getSecond());
3702         if(isInReplay) {
3703           
3704           CellUtil.setSequenceId(ret.getSecond(), mvccNum);
3705         }
3706       }
3707     }
3708 
3709      return size;
3710    }
3711 
3712   
3713 
3714 
3715 
3716 
3717   private void rollbackMemstore(List<Cell> memstoreCells) {
3718     int kvsRolledback = 0;
3719 
3720     for (Cell cell : memstoreCells) {
3721       byte[] family = CellUtil.cloneFamily(cell);
3722       Store store = getStore(family);
3723       store.rollback(cell);
3724       kvsRolledback++;
3725     }
3726     LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3727   }
3728 
3729   @Override
3730   public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
3731     for (byte[] family : families) {
3732       checkFamily(family);
3733     }
3734   }
3735 
3736   
3737 
3738 
3739 
3740   private void removeNonExistentColumnFamilyForReplay(
3741       final Map<byte[], List<Cell>> familyMap) {
3742     List<byte[]> nonExistentList = null;
3743     for (byte[] family : familyMap.keySet()) {
3744       if (!this.htableDescriptor.hasFamily(family)) {
3745         if (nonExistentList == null) {
3746           nonExistentList = new ArrayList<byte[]>();
3747         }
3748         nonExistentList.add(family);
3749       }
3750     }
3751     if (nonExistentList != null) {
3752       for (byte[] family : nonExistentList) {
3753         
3754         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3755         familyMap.remove(family);
3756       }
3757     }
3758   }
3759 
3760   @Override
3761   public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
3762       throws FailedSanityCheckException {
3763     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3764       return;
3765     }
3766     long maxTs = now + timestampSlop;
3767     for (List<Cell> kvs : familyMap.values()) {
3768       assert kvs instanceof RandomAccess;
3769       int listSize  = kvs.size();
3770       for (int i=0; i < listSize; i++) {
3771         Cell cell = kvs.get(i);
3772         
3773         long ts = cell.getTimestamp();
3774         if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3775           throw new FailedSanityCheckException("Timestamp for KV out of range "
3776               + cell + " (too.new=" + timestampSlop + ")");
3777         }
3778       }
3779     }
3780   }
3781 
3782   
3783 
3784 
3785 
3786 
3787 
3788   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3789       WALEdit walEdit) {
3790     for (List<Cell> edits : familyMap.values()) {
3791       assert edits instanceof RandomAccess;
3792       int listSize = edits.size();
3793       for (int i=0; i < listSize; i++) {
3794         Cell cell = edits.get(i);
3795         walEdit.add(cell);
3796       }
3797     }
3798   }
3799 
3800   private void requestFlush() {
3801     if (this.rsServices == null) {
3802       return;
3803     }
3804     synchronized (writestate) {
3805       if (this.writestate.isFlushRequested()) {
3806         return;
3807       }
3808       writestate.flushRequested = true;
3809     }
3810     
3811     this.rsServices.getFlushRequester().requestFlush(this, false);
3812     if (LOG.isDebugEnabled()) {
3813       LOG.debug("Flush requested on " + this);
3814     }
3815   }
3816 
3817   
3818 
3819 
3820 
3821   private boolean isFlushSize(final long size) {
3822     return size > this.memstoreFlushSize;
3823   }
3824 
3825   
3826 
3827 
3828 
3829 
3830 
3831 
3832 
3833 
3834 
3835 
3836 
3837 
3838 
3839 
3840 
3841 
3842 
3843 
3844 
3845 
3846 
3847 
3848 
3849 
3850 
3851 
3852 
3853 
3854 
3855 
3856 
3857 
3858 
3859   protected long replayRecoveredEditsIfAny(final Path regiondir,
3860       Map<byte[], Long> maxSeqIdInStores,
3861       final CancelableProgressable reporter, final MonitoredTask status)
3862       throws IOException {
3863     long minSeqIdForTheRegion = -1;
3864     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3865       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3866         minSeqIdForTheRegion = maxSeqIdInStore;
3867       }
3868     }
3869     long seqid = minSeqIdForTheRegion;
3870 
3871     FileSystem fs = this.fs.getFileSystem();
3872     NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3873     if (LOG.isDebugEnabled()) {
3874       LOG.debug("Found " + (files == null ? 0 : files.size())
3875         + " recovered edits file(s) under " + regiondir);
3876     }
3877 
3878     if (files == null || files.isEmpty()) return seqid;
3879 
3880     for (Path edits: files) {
3881       if (edits == null || !fs.exists(edits)) {
3882         LOG.warn("Null or non-existent edits file: " + edits);
3883         continue;
3884       }
3885       if (isZeroLengthThenDelete(fs, edits)) continue;
3886 
3887       long maxSeqId;
3888       String fileName = edits.getName();
3889       maxSeqId = Math.abs(Long.parseLong(fileName));
3890       if (maxSeqId <= minSeqIdForTheRegion) {
3891         if (LOG.isDebugEnabled()) {
3892           String msg = "Maximum sequenceid for this wal is " + maxSeqId
3893             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3894             + ", skipped the whole file, path=" + edits;
3895           LOG.debug(msg);
3896         }
3897         continue;
3898       }
3899 
3900       try {
3901         
3902         
3903         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3904       } catch (IOException e) {
3905         boolean skipErrors = conf.getBoolean(
3906             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3907             conf.getBoolean(
3908                 "hbase.skip.errors",
3909                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3910         if (conf.get("hbase.skip.errors") != null) {
3911           LOG.warn(
3912               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3913               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3914         }
3915         if (skipErrors) {
3916           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3917           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3918               + "=true so continuing. Renamed " + edits +
3919               " as " + p, e);
3920         } else {
3921           throw e;
3922         }
3923       }
3924     }
3925     
3926     
3927     if (this.rsAccounting != null) {
3928       this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
3929     }
3930     if (seqid > minSeqIdForTheRegion) {
3931       
3932       internalFlushcache(null, seqid, stores.values(), status, false);
3933     }
3934     
3935     if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
3936       
3937       
3938       
3939       String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
3940       Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
3941       for (Path file: files) {
3942         fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
3943           null, null));
3944       }
3945       getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
3946     } else {
3947       for (Path file: files) {
3948         if (!fs.delete(file, false)) {
3949           LOG.error("Failed delete of " + file);
3950         } else {
3951           LOG.debug("Deleted recovered.edits file=" + file);
3952         }
3953       }
3954     }
3955     return seqid;
3956   }
3957 
3958   
3959 
3960 
3961 
3962 
3963 
3964 
3965 
3966 
3967   private long replayRecoveredEdits(final Path edits,
3968       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3969     throws IOException {
3970     String msg = "Replaying edits from " + edits;
3971     LOG.info(msg);
3972     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3973     FileSystem fs = this.fs.getFileSystem();
3974 
3975     status.setStatus("Opening recovered edits");
3976     WAL.Reader reader = null;
3977     try {
3978       reader = WALFactory.createReader(fs, edits, conf);
3979       long currentEditSeqId = -1;
3980       long currentReplaySeqId = -1;
3981       long firstSeqIdInLog = -1;
3982       long skippedEdits = 0;
3983       long editsCount = 0;
3984       long intervalEdits = 0;
3985       WAL.Entry entry;
3986       Store store = null;
3987       boolean reported_once = false;
3988       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3989 
3990       try {
3991         
3992         int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
3993         
3994         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3995         long lastReport = EnvironmentEdgeManager.currentTime();
3996 
3997         while ((entry = reader.next()) != null) {
3998           WALKey key = entry.getKey();
3999           WALEdit val = entry.getEdit();
4000 
4001           if (ng != null) { 
4002             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
4003           }
4004 
4005           if (reporter != null) {
4006             intervalEdits += val.size();
4007             if (intervalEdits >= interval) {
4008               
4009               intervalEdits = 0;
4010               long cur = EnvironmentEdgeManager.currentTime();
4011               if (lastReport + period <= cur) {
4012                 status.setStatus("Replaying edits..." +
4013                     " skipped=" + skippedEdits +
4014                     " edits=" + editsCount);
4015                 
4016                 if(!reporter.progress()) {
4017                   msg = "Progressable reporter failed, stopping replay";
4018                   LOG.warn(msg);
4019                   status.abort(msg);
4020                   throw new IOException(msg);
4021                 }
4022                 reported_once = true;
4023                 lastReport = cur;
4024               }
4025             }
4026           }
4027 
4028           if (firstSeqIdInLog == -1) {
4029             firstSeqIdInLog = key.getLogSeqNum();
4030           }
4031           if (currentEditSeqId > key.getLogSeqNum()) {
4032             
4033             
4034             LOG.error(getRegionInfo().getEncodedName() + " : "
4035                  + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
4036                 + "; edit=" + val);
4037           } else {
4038             currentEditSeqId = key.getLogSeqNum();
4039           }
4040           currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
4041             key.getOrigLogSeqNum() : currentEditSeqId;
4042 
4043           
4044           
4045           if (coprocessorHost != null) {
4046             status.setStatus("Running pre-WAL-restore hook in coprocessors");
4047             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
4048               
4049               continue;
4050             }
4051           }
4052           
4053           if (!Bytes.equals(key.getEncodedRegionName(),
4054               this.getRegionInfo().getEncodedNameAsBytes())) {
4055             skippedEdits++;
4056             continue;
4057           }
4058 
4059           boolean flush = false;
4060           for (Cell cell: val.getCells()) {
4061             
4062             
4063             if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
4064               
4065               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
4066               if (compaction != null) {
4067                 
4068                 replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
4069               }
4070               skippedEdits++;
4071               continue;
4072             }
4073             
4074             if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
4075               store = getStore(cell);
4076             }
4077             if (store == null) {
4078               
4079               
4080               LOG.warn("No family for " + cell);
4081               skippedEdits++;
4082               continue;
4083             }
4084             
4085             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
4086                 .getName())) {
4087               skippedEdits++;
4088               continue;
4089             }
4090             CellUtil.setSequenceId(cell, currentReplaySeqId);
4091 
4092             
4093             
4094             
4095             flush |= restoreEdit(store, cell);
4096             editsCount++;
4097           }
4098           if (flush) {
4099             internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
4100           }
4101 
4102           if (coprocessorHost != null) {
4103             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
4104           }
4105         }
4106       } catch (EOFException eof) {
4107         Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4108         msg = "Encountered EOF. Most likely due to Master failure during " +
4109             "wal splitting, so we have this data in another edit.  " +
4110             "Continuing, but renaming " + edits + " as " + p;
4111         LOG.warn(msg, eof);
4112         status.abort(msg);
4113       } catch (IOException ioe) {
4114         
4115         
4116         if (ioe.getCause() instanceof ParseException) {
4117           Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4118           msg = "File corruption encountered!  " +
4119               "Continuing, but renaming " + edits + " as " + p;
4120           LOG.warn(msg, ioe);
4121           status.setStatus(msg);
4122         } else {
4123           status.abort(StringUtils.stringifyException(ioe));
4124           
4125           
4126           throw ioe;
4127         }
4128       }
4129       if (reporter != null && !reported_once) {
4130         reporter.progress();
4131       }
4132       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
4133         ", firstSequenceIdInLog=" + firstSeqIdInLog +
4134         ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
4135       status.markComplete(msg);
4136       LOG.debug(msg);
4137       return currentEditSeqId;
4138     } finally {
4139       status.cleanup();
4140       if (reader != null) {
4141          reader.close();
4142       }
4143     }
4144   }
4145 
4146   
4147 
4148 
4149 
4150 
4151   void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
4152       boolean removeFiles, long replaySeqId)
4153       throws IOException {
4154     checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
4155       "Compaction marker from WAL ", compaction);
4156 
4157     synchronized (writestate) {
4158       if (replaySeqId < lastReplayedOpenRegionSeqId) {
4159         LOG.warn(getRegionInfo().getEncodedName() + " : "
4160             + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4161             + " because its sequence id " + replaySeqId + " is smaller than this regions "
4162             + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4163         return;
4164       }
4165       if (replaySeqId < lastReplayedCompactionSeqId) {
4166         LOG.warn(getRegionInfo().getEncodedName() + " : "
4167             + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4168             + " because its sequence id " + replaySeqId + " is smaller than this regions "
4169             + "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId);
4170         return;
4171       } else {
4172         lastReplayedCompactionSeqId = replaySeqId;
4173       }
4174 
4175       if (LOG.isDebugEnabled()) {
4176         LOG.debug(getRegionInfo().getEncodedName() + " : "
4177             + "Replaying compaction marker " + TextFormat.shortDebugString(compaction)
4178             + " with seqId=" + replaySeqId + " and lastReplayedOpenRegionSeqId="
4179             + lastReplayedOpenRegionSeqId);
4180       }
4181 
4182       startRegionOperation(Operation.REPLAY_EVENT);
4183       try {
4184         Store store = this.getStore(compaction.getFamilyName().toByteArray());
4185         if (store == null) {
4186           LOG.warn(getRegionInfo().getEncodedName() + " : "
4187               + "Found Compaction WAL edit for deleted family:"
4188               + Bytes.toString(compaction.getFamilyName().toByteArray()));
4189           return;
4190         }
4191         store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
4192         logRegionFiles();
4193       } catch (FileNotFoundException ex) {
4194         LOG.warn(getRegionInfo().getEncodedName() + " : "
4195             + "At least one of the store files in compaction: "
4196             + TextFormat.shortDebugString(compaction)
4197             + " doesn't exist any more. Skip loading the file(s)", ex);
4198       } finally {
4199         closeRegionOperation(Operation.REPLAY_EVENT);
4200       }
4201     }
4202   }
4203 
4204   void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
4205     checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
4206       "Flush marker from WAL ", flush);
4207 
4208     if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4209       return; 
4210     }
4211 
4212     if (LOG.isDebugEnabled()) {
4213       LOG.debug(getRegionInfo().getEncodedName() + " : "
4214           + "Replaying flush marker " + TextFormat.shortDebugString(flush));
4215     }
4216 
4217     startRegionOperation(Operation.REPLAY_EVENT); 
4218     try {
4219       FlushAction action = flush.getAction();
4220       switch (action) {
4221       case START_FLUSH:
4222         replayWALFlushStartMarker(flush);
4223         break;
4224       case COMMIT_FLUSH:
4225         replayWALFlushCommitMarker(flush);
4226         break;
4227       case ABORT_FLUSH:
4228         replayWALFlushAbortMarker(flush);
4229         break;
4230       case CANNOT_FLUSH:
4231         replayWALFlushCannotFlushMarker(flush, replaySeqId);
4232         break;
4233       default:
4234         LOG.warn(getRegionInfo().getEncodedName() + " : " +
4235           "Received a flush event with unknown action, ignoring. " +
4236           TextFormat.shortDebugString(flush));
4237         break;
4238       }
4239 
4240       logRegionFiles();
4241     } finally {
4242       closeRegionOperation(Operation.REPLAY_EVENT);
4243     }
4244   }
4245 
4246   
4247 
4248 
4249 
4250   @VisibleForTesting
4251   PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
4252     long flushSeqId = flush.getFlushSequenceNumber();
4253 
4254     HashSet<Store> storesToFlush = new HashSet<Store>();
4255     for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4256       byte[] family = storeFlush.getFamilyName().toByteArray();
4257       Store store = getStore(family);
4258       if (store == null) {
4259         LOG.warn(getRegionInfo().getEncodedName() + " : "
4260           + "Received a flush start marker from primary, but the family is not found. Ignoring"
4261           + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
4262         continue;
4263       }
4264       storesToFlush.add(store);
4265     }
4266 
4267     MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
4268 
4269     
4270     
4271     synchronized (writestate) {
4272       try {
4273         if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4274           LOG.warn(getRegionInfo().getEncodedName() + " : "
4275               + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4276               + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4277               + " of " + lastReplayedOpenRegionSeqId);
4278           return null;
4279         }
4280         if (numMutationsWithoutWAL.get() > 0) {
4281           numMutationsWithoutWAL.set(0);
4282           dataInMemoryWithoutWAL.set(0);
4283         }
4284 
4285         if (!writestate.flushing) {
4286           
4287           
4288 
4289           
4290           PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
4291             flushSeqId, storesToFlush, status, false);
4292           if (prepareResult.result == null) {
4293             
4294             this.writestate.flushing = true;
4295             this.prepareFlushResult = prepareResult;
4296             status.markComplete("Flush prepare successful");
4297             if (LOG.isDebugEnabled()) {
4298               LOG.debug(getRegionInfo().getEncodedName() + " : "
4299                   + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
4300             }
4301           } else {
4302             
4303             
4304             if (prepareResult.getResult().getResult() ==
4305                   FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4306               this.writestate.flushing = true;
4307               this.prepareFlushResult = prepareResult;
4308               if (LOG.isDebugEnabled()) {
4309                 LOG.debug(getRegionInfo().getEncodedName() + " : "
4310                   + " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber());
4311               }
4312             }
4313             status.abort("Flush prepare failed with " + prepareResult.result);
4314             
4315           }
4316           return prepareResult;
4317         } else {
4318           
4319           if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
4320             
4321             LOG.warn(getRegionInfo().getEncodedName() + " : "
4322                 + "Received a flush prepare marker with the same seqId: " +
4323                 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4324                 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4325             
4326           } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
4327             
4328             
4329             LOG.warn(getRegionInfo().getEncodedName() + " : "
4330                 + "Received a flush prepare marker with a smaller seqId: " +
4331                 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4332                 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4333             
4334           } else {
4335             
4336             LOG.warn(getRegionInfo().getEncodedName() + " : "
4337                 + "Received a flush prepare marker with a larger seqId: " +
4338                 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4339                 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4340             
4341             
4342             
4343             
4344             
4345             
4346             
4347             
4348             
4349             
4350             
4351           }
4352         }
4353       } finally {
4354         status.cleanup();
4355         writestate.notifyAll();
4356       }
4357     }
4358     return null;
4359   }
4360 
4361   @VisibleForTesting
4362   void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
4363     MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
4364 
4365     
4366     
4367     
4368     
4369     synchronized (writestate) {
4370       try {
4371         if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4372           LOG.warn(getRegionInfo().getEncodedName() + " : "
4373             + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4374             + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4375             + " of " + lastReplayedOpenRegionSeqId);
4376           return;
4377         }
4378 
4379         if (writestate.flushing) {
4380           PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
4381           if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
4382             if (LOG.isDebugEnabled()) {
4383               LOG.debug(getRegionInfo().getEncodedName() + " : "
4384                   + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4385                   + " and a previous prepared snapshot was found");
4386             }
4387             
4388             
4389             replayFlushInStores(flush, prepareFlushResult, true);
4390 
4391             
4392             this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4393 
4394             this.prepareFlushResult = null;
4395             writestate.flushing = false;
4396           } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
4397             
4398             
4399             
4400             
4401             LOG.warn(getRegionInfo().getEncodedName() + " : "
4402                 + "Received a flush commit marker with smaller seqId: "
4403                 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
4404                 + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
4405                 +"  prepared memstore snapshot");
4406             replayFlushInStores(flush, prepareFlushResult, false);
4407 
4408             
4409             
4410           } else {
4411             
4412             
4413             
4414             
4415             
4416             LOG.warn(getRegionInfo().getEncodedName() + " : "
4417                 + "Received a flush commit marker with larger seqId: "
4418                 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
4419                 prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
4420                 +" memstore snapshot");
4421 
4422             replayFlushInStores(flush, prepareFlushResult, true);
4423 
4424             
4425             this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4426 
4427             
4428             
4429             dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4430 
4431             this.prepareFlushResult = null;
4432             writestate.flushing = false;
4433           }
4434           
4435           
4436           
4437           
4438           
4439           this.setReadsEnabled(true);
4440         } else {
4441           LOG.warn(getRegionInfo().getEncodedName() + " : "
4442               + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4443               + ", but no previous prepared snapshot was found");
4444           
4445           
4446           replayFlushInStores(flush, null, false);
4447 
4448           
4449           
4450           dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4451         }
4452 
4453         status.markComplete("Flush commit successful");
4454 
4455         
4456         this.maxFlushedSeqId = flush.getFlushSequenceNumber();
4457 
4458         
4459         
4460         
4461         
4462         
4463         
4464         getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
4465 
4466       } catch (FileNotFoundException ex) {
4467         LOG.warn(getRegionInfo().getEncodedName() + " : "
4468             + "At least one of the store files in flush: " + TextFormat.shortDebugString(flush)
4469             + " doesn't exist any more. Skip loading the file(s)", ex);
4470       }
4471       finally {
4472         status.cleanup();
4473         writestate.notifyAll();
4474       }
4475     }
4476 
4477     
4478     
4479     synchronized (this) {
4480       notifyAll(); 
4481     }
4482   }
4483 
4484   
4485 
4486 
4487 
4488 
4489 
4490 
4491 
4492   private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
4493       boolean dropMemstoreSnapshot)
4494       throws IOException {
4495     for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4496       byte[] family = storeFlush.getFamilyName().toByteArray();
4497       Store store = getStore(family);
4498       if (store == null) {
4499         LOG.warn(getRegionInfo().getEncodedName() + " : "
4500             + "Received a flush commit marker from primary, but the family is not found."
4501             + "Ignoring StoreFlushDescriptor:" + storeFlush);
4502         continue;
4503       }
4504       List<String> flushFiles = storeFlush.getFlushOutputList();
4505       StoreFlushContext ctx = null;
4506       long startTime = EnvironmentEdgeManager.currentTime();
4507       if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
4508         ctx = store.createFlushContext(flush.getFlushSequenceNumber());
4509       } else {
4510         ctx = prepareFlushResult.storeFlushCtxs.get(family);
4511         startTime = prepareFlushResult.startTime;
4512       }
4513 
4514       if (ctx == null) {
4515         LOG.warn(getRegionInfo().getEncodedName() + " : "
4516             + "Unexpected: flush commit marker received from store "
4517             + Bytes.toString(family) + " but no associated flush context. Ignoring");
4518         continue;
4519       }
4520 
4521       ctx.replayFlush(flushFiles, dropMemstoreSnapshot); 
4522 
4523       
4524       this.lastStoreFlushTimeMap.put(store, startTime);
4525     }
4526   }
4527 
4528   
4529 
4530 
4531 
4532 
4533 
4534   private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
4535     long totalFreedSize = 0;
4536     this.updatesLock.writeLock().lock();
4537     try {
4538       mvcc.waitForPreviousTransactionsComplete();
4539       long currentSeqId = getSequenceId().get();
4540       if (seqId >= currentSeqId) {
4541         
4542         LOG.info(getRegionInfo().getEncodedName() + " : "
4543             + "Dropping memstore contents as well since replayed flush seqId: "
4544             + seqId + " is greater than current seqId:" + currentSeqId);
4545 
4546         
4547         if (store == null ) {
4548           for (Store s : stores.values()) {
4549             totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
4550           }
4551         } else {
4552           totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
4553         }
4554       } else {
4555         LOG.info(getRegionInfo().getEncodedName() + " : "
4556             + "Not dropping memstore contents since replayed flush seqId: "
4557             + seqId + " is smaller than current seqId:" + currentSeqId);
4558       }
4559     } finally {
4560       this.updatesLock.writeLock().unlock();
4561     }
4562     return totalFreedSize;
4563   }
4564 
4565   private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
4566     long snapshotSize = s.getFlushableSize();
4567     this.addAndGetGlobalMemstoreSize(-snapshotSize);
4568     StoreFlushContext ctx = s.createFlushContext(currentSeqId);
4569     ctx.prepare();
4570     ctx.abort();
4571     return snapshotSize;
4572   }
4573 
4574   private void replayWALFlushAbortMarker(FlushDescriptor flush) {
4575     
4576     
4577     
4578   }
4579 
4580   private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) {
4581     synchronized (writestate) {
4582       if (this.lastReplayedOpenRegionSeqId > replaySeqId) {
4583         LOG.warn(getRegionInfo().getEncodedName() + " : "
4584           + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4585           + " because its sequence id " + replaySeqId + " is smaller than this regions "
4586           + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4587         return;
4588       }
4589 
4590       
4591       
4592       
4593       
4594       
4595       this.setReadsEnabled(true);
4596     }
4597   }
4598 
4599   @VisibleForTesting
4600   PrepareFlushResult getPrepareFlushResult() {
4601     return prepareFlushResult;
4602   }
4603 
4604   void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
4605     checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
4606       "RegionEvent marker from WAL ", regionEvent);
4607 
4608     startRegionOperation(Operation.REPLAY_EVENT);
4609     try {
4610       if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4611         return; 
4612       }
4613 
4614       if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
4615         
4616         return;
4617       }
4618       if (regionEvent.getEventType() != EventType.REGION_OPEN) {
4619         LOG.warn(getRegionInfo().getEncodedName() + " : "
4620             + "Unknown region event received, ignoring :"
4621             + TextFormat.shortDebugString(regionEvent));
4622         return;
4623       }
4624 
4625       if (LOG.isDebugEnabled()) {
4626         LOG.debug(getRegionInfo().getEncodedName() + " : "
4627           + "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
4628       }
4629 
4630       
4631       synchronized (writestate) {
4632         
4633         
4634         
4635         
4636         
4637         
4638         if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) {
4639           this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
4640         } else {
4641           LOG.warn(getRegionInfo().getEncodedName() + " : "
4642             + "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
4643             + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4644             + " of " + lastReplayedOpenRegionSeqId);
4645           return;
4646         }
4647 
4648         
4649         
4650         for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
4651           
4652           byte[] family = storeDescriptor.getFamilyName().toByteArray();
4653           Store store = getStore(family);
4654           if (store == null) {
4655             LOG.warn(getRegionInfo().getEncodedName() + " : "
4656                 + "Received a region open marker from primary, but the family is not found. "
4657                 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4658             continue;
4659           }
4660 
4661           long storeSeqId = store.getMaxSequenceId();
4662           List<String> storeFiles = storeDescriptor.getStoreFileList();
4663           try {
4664             store.refreshStoreFiles(storeFiles); 
4665           } catch (FileNotFoundException ex) {
4666             LOG.warn(getRegionInfo().getEncodedName() + " : "
4667                     + "At least one of the store files: " + storeFiles
4668                     + " doesn't exist any more. Skip loading the file(s)", ex);
4669             continue;
4670           }
4671           if (store.getMaxSequenceId() != storeSeqId) {
4672             
4673             lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
4674           }
4675 
4676           if (writestate.flushing) {
4677             
4678             if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
4679               StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4680                   null : this.prepareFlushResult.storeFlushCtxs.get(family);
4681               if (ctx != null) {
4682                 long snapshotSize = store.getFlushableSize();
4683                 ctx.abort();
4684                 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4685                 this.prepareFlushResult.storeFlushCtxs.remove(family);
4686               }
4687             }
4688           }
4689 
4690           
4691           dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
4692           if (storeSeqId > this.maxFlushedSeqId) {
4693             this.maxFlushedSeqId = storeSeqId;
4694           }
4695         }
4696 
4697         
4698         
4699         dropPrepareFlushIfPossible();
4700 
4701         
4702         
4703         
4704         getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
4705 
4706         
4707         
4708         this.setReadsEnabled(true);
4709 
4710         
4711         
4712         synchronized (this) {
4713           notifyAll(); 
4714         }
4715       }
4716       logRegionFiles();
4717     } finally {
4718       closeRegionOperation(Operation.REPLAY_EVENT);
4719     }
4720   }
4721 
4722   void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
4723     checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
4724       "BulkLoad marker from WAL ", bulkLoadEvent);
4725 
4726     if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4727       return; 
4728     }
4729 
4730     if (LOG.isDebugEnabled()) {
4731       LOG.debug(getRegionInfo().getEncodedName() + " : "
4732               +  "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent));
4733     }
4734     
4735     boolean multipleFamilies = false;
4736     byte[] family = null;
4737     for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4738       byte[] fam = storeDescriptor.getFamilyName().toByteArray();
4739       if (family == null) {
4740         family = fam;
4741       } else if (!Bytes.equals(family, fam)) {
4742         multipleFamilies = true;
4743         break;
4744       }
4745     }
4746 
4747     startBulkRegionOperation(multipleFamilies);
4748     try {
4749       
4750       synchronized (writestate) {
4751         
4752         
4753         
4754         
4755         
4756         
4757         if (bulkLoadEvent.getBulkloadSeqNum() >= 0
4758             && this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum()) {
4759           LOG.warn(getRegionInfo().getEncodedName() + " : "
4760               + "Skipping replaying bulkload event :"
4761               + TextFormat.shortDebugString(bulkLoadEvent)
4762               + " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId"
4763               + " =" + lastReplayedOpenRegionSeqId);
4764 
4765           return;
4766         }
4767 
4768         for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4769           
4770           family = storeDescriptor.getFamilyName().toByteArray();
4771           Store store = getStore(family);
4772           if (store == null) {
4773             LOG.warn(getRegionInfo().getEncodedName() + " : "
4774                     + "Received a bulk load marker from primary, but the family is not found. "
4775                     + "Ignoring. StoreDescriptor:" + storeDescriptor);
4776             continue;
4777           }
4778 
4779           List<String> storeFiles = storeDescriptor.getStoreFileList();
4780           for (String storeFile : storeFiles) {
4781             StoreFileInfo storeFileInfo = null;
4782             try {
4783               storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
4784               store.bulkLoadHFile(storeFileInfo);
4785             } catch(FileNotFoundException ex) {
4786               LOG.warn(getRegionInfo().getEncodedName() + " : "
4787                       + ((storeFileInfo != null) ? storeFileInfo.toString() :
4788                             (new Path(Bytes.toString(family), storeFile)).toString())
4789                       + " doesn't exist any more. Skip loading the file");
4790             }
4791           }
4792         }
4793       }
4794       if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
4795         getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum());
4796       }
4797     } finally {
4798       closeBulkRegionOperation();
4799     }
4800   }
4801 
4802   
4803 
4804 
4805   private void dropPrepareFlushIfPossible() {
4806     if (writestate.flushing) {
4807       boolean canDrop = true;
4808       if (prepareFlushResult.storeFlushCtxs != null) {
4809         for (Entry<byte[], StoreFlushContext> entry
4810             : prepareFlushResult.storeFlushCtxs.entrySet()) {
4811           Store store = getStore(entry.getKey());
4812           if (store == null) {
4813             continue;
4814           }
4815           if (store.getSnapshotSize() > 0) {
4816             canDrop = false;
4817             break;
4818           }
4819         }
4820       }
4821 
4822       
4823       
4824       if (canDrop) {
4825         writestate.flushing = false;
4826         this.prepareFlushResult = null;
4827       }
4828     }
4829   }
4830 
4831   @Override
4832   public boolean refreshStoreFiles() throws IOException {
4833     if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4834       return false; 
4835     }
4836 
4837     if (LOG.isDebugEnabled()) {
4838       LOG.debug(getRegionInfo().getEncodedName() + " : "
4839           + "Refreshing store files to see whether we can free up memstore");
4840     }
4841 
4842     long totalFreedSize = 0;
4843 
4844     long smallestSeqIdInStores = Long.MAX_VALUE;
4845 
4846     startRegionOperation(); 
4847     try {
4848       synchronized (writestate) {
4849         for (Store store : getStores()) {
4850           
4851           
4852           long maxSeqIdBefore = store.getMaxSequenceId();
4853 
4854           
4855           store.refreshStoreFiles();
4856 
4857           long storeSeqId = store.getMaxSequenceId();
4858           if (storeSeqId < smallestSeqIdInStores) {
4859             smallestSeqIdInStores = storeSeqId;
4860           }
4861 
4862           
4863           if (storeSeqId > maxSeqIdBefore) {
4864 
4865             if (writestate.flushing) {
4866               
4867               if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
4868                 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4869                     null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
4870                 if (ctx != null) {
4871                   long snapshotSize = store.getFlushableSize();
4872                   ctx.abort();
4873                   this.addAndGetGlobalMemstoreSize(-snapshotSize);
4874                   this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
4875                   totalFreedSize += snapshotSize;
4876                 }
4877               }
4878             }
4879 
4880             
4881             totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store);
4882           }
4883         }
4884 
4885         
4886         
4887         dropPrepareFlushIfPossible();
4888 
4889         
4890         
4891         
4892         for (Store s : getStores()) {
4893           getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS());
4894         }
4895 
4896         
4897         
4898         
4899         
4900         if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) {
4901           this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores;
4902         }
4903       }
4904       
4905       
4906       synchronized (this) {
4907         notifyAll(); 
4908       }
4909       return totalFreedSize > 0;
4910     } finally {
4911       closeRegionOperation();
4912     }
4913   }
4914 
4915   private void logRegionFiles() {
4916     if (LOG.isTraceEnabled()) {
4917       LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
4918       for (Store s : stores.values()) {
4919         Collection<StoreFile> storeFiles = s.getStorefiles();
4920         if (storeFiles == null) continue;
4921         for (StoreFile sf : storeFiles) {
4922           LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
4923         }
4924       }
4925     }
4926   }
4927 
4928   
4929 
4930 
4931   private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
4932       throws WrongRegionException {
4933     if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
4934       return;
4935     }
4936 
4937     if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
4938         Bytes.equals(encodedRegionName,
4939           this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
4940       return;
4941     }
4942 
4943     throw new WrongRegionException(exceptionMsg + payload
4944       + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
4945       + " does not match this region: " + this.getRegionInfo());
4946   }
4947 
4948   
4949 
4950 
4951 
4952 
4953 
4954   protected boolean restoreEdit(final Store s, final Cell cell) {
4955     long kvSize = s.add(cell).getFirst();
4956     if (this.rsAccounting != null) {
4957       rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
4958     }
4959     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
4960   }
4961 
4962   
4963 
4964 
4965 
4966 
4967 
4968   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
4969       throws IOException {
4970     FileStatus stat = fs.getFileStatus(p);
4971     if (stat.getLen() > 0) return false;
4972     LOG.warn("File " + p + " is zero-length, deleting.");
4973     fs.delete(p, false);
4974     return true;
4975   }
4976 
4977   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
4978     return new HStore(this, family, this.conf);
4979   }
4980 
4981   @Override
4982   public Store getStore(final byte[] column) {
4983     return this.stores.get(column);
4984   }
4985 
4986   public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
4987     return lockedRows;
4988   }
4989 
4990   
4991 
4992 
4993 
4994   private Store getStore(Cell cell) {
4995     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
4996       if (Bytes.equals(
4997           cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
4998           famStore.getKey(), 0, famStore.getKey().length)) {
4999         return famStore.getValue();
5000       }
5001     }
5002 
5003     return null;
5004   }
5005 
5006   @Override
5007   public List<Store> getStores() {
5008     List<Store> list = new ArrayList<Store>(stores.size());
5009     list.addAll(stores.values());
5010     return list;
5011   }
5012 
5013   @Override
5014   public List<String> getStoreFileList(final byte [][] columns)
5015     throws IllegalArgumentException {
5016     List<String> storeFileNames = new ArrayList<String>();
5017     synchronized(closeLock) {
5018       for(byte[] column : columns) {
5019         Store store = this.stores.get(column);
5020         if (store == null) {
5021           throw new IllegalArgumentException("No column family : " +
5022               new String(column) + " available");
5023         }
5024         Collection<StoreFile> storeFiles = store.getStorefiles();
5025         if (storeFiles == null) continue;
5026         for (StoreFile storeFile: storeFiles) {
5027           storeFileNames.add(storeFile.getPath().toString());
5028         }
5029 
5030         logRegionFiles();
5031       }
5032     }
5033     return storeFileNames;
5034   }
5035 
5036   
5037   
5038   
5039 
5040   
5041   void checkRow(final byte [] row, String op) throws IOException {
5042     if (!rowIsInRange(getRegionInfo(), row)) {
5043       throw new WrongRegionException("Requested row out of range for " +
5044           op + " on HRegion " + this + ", startKey='" +
5045           Bytes.toStringBinary(getRegionInfo().getStartKey()) + "', getEndKey()='" +
5046           Bytes.toStringBinary(getRegionInfo().getEndKey()) + "', row='" +
5047           Bytes.toStringBinary(row) + "'");
5048     }
5049   }
5050 
5051   @Override
5052   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
5053     startRegionOperation();
5054     try {
5055       return getRowLockInternal(row, waitForLock);
5056     } finally {
5057       closeRegionOperation();
5058     }
5059   }
5060 
5061   
5062 
5063 
5064 
5065   protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
5066     HashedBytes rowKey = new HashedBytes(row);
5067     RowLockContext rowLockContext = new RowLockContext(rowKey);
5068 
5069     
5070     while (true) {
5071       RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
5072       if (existingContext == null) {
5073         
5074         break;
5075       } else if (existingContext.ownedByCurrentThread()) {
5076         
5077         rowLockContext = existingContext;
5078         break;
5079       } else {
5080         if (!waitForLock) {
5081           return null;
5082         }
5083         TraceScope traceScope = null;
5084         try {
5085           if (Trace.isTracing()) {
5086             traceScope = Trace.startSpan("HRegion.getRowLockInternal");
5087           }
5088           
5089           if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
5090             if(traceScope != null) {
5091               traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
5092             }
5093             throw new IOException("Timed out waiting for lock for row: " + rowKey);
5094           }
5095           if (traceScope != null) traceScope.close();
5096           traceScope = null;
5097         } catch (InterruptedException ie) {
5098           LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
5099           InterruptedIOException iie = new InterruptedIOException();
5100           iie.initCause(ie);
5101           throw iie;
5102         } finally {
5103           if (traceScope != null) traceScope.close();
5104         }
5105       }
5106     }
5107 
5108     
5109     return rowLockContext.newLock();
5110   }
5111 
5112   
5113 
5114 
5115 
5116 
5117 
5118   public RowLock getRowLock(byte[] row) throws IOException {
5119     return getRowLock(row, true);
5120   }
5121 
5122   @Override
5123   public void releaseRowLocks(List<RowLock> rowLocks) {
5124     if (rowLocks != null) {
5125       for (RowLock rowLock : rowLocks) {
5126         rowLock.release();
5127       }
5128       rowLocks.clear();
5129     }
5130   }
5131 
5132   
5133 
5134 
5135 
5136 
5137 
5138   private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {
5139     boolean multipleFamilies = false;
5140     byte[] family = null;
5141     for (Pair<byte[], String> pair : familyPaths) {
5142       byte[] fam = pair.getFirst();
5143       if (family == null) {
5144         family = fam;
5145       } else if (!Bytes.equals(family, fam)) {
5146         multipleFamilies = true;
5147         break;
5148       }
5149     }
5150     return multipleFamilies;
5151   }
5152 
5153   @Override
5154   public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
5155       BulkLoadListener bulkLoadListener) throws IOException {
5156     long seqId = -1;
5157     Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
5158     Preconditions.checkNotNull(familyPaths);
5159     
5160     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
5161     try {
5162       this.writeRequestsCount.increment();
5163 
5164       
5165       
5166       
5167       List<IOException> ioes = new ArrayList<IOException>();
5168       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
5169       for (Pair<byte[], String> p : familyPaths) {
5170         byte[] familyName = p.getFirst();
5171         String path = p.getSecond();
5172 
5173         Store store = getStore(familyName);
5174         if (store == null) {
5175           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
5176               "No such column family " + Bytes.toStringBinary(familyName));
5177           ioes.add(ioe);
5178         } else {
5179           try {
5180             store.assertBulkLoadHFileOk(new Path(path));
5181           } catch (WrongRegionException wre) {
5182             
5183             failures.add(p);
5184           } catch (IOException ioe) {
5185             
5186             ioes.add(ioe);
5187           }
5188         }
5189       }
5190 
5191       
5192       if (ioes.size() != 0) {
5193         IOException e = MultipleIOException.createIOException(ioes);
5194         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
5195         throw e;
5196       }
5197 
5198       
5199       if (failures.size() != 0) {
5200         StringBuilder list = new StringBuilder();
5201         for (Pair<byte[], String> p : failures) {
5202           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
5203               .append(p.getSecond());
5204         }
5205         
5206         LOG.warn("There was a recoverable bulk load failure likely due to a" +
5207             " split.  These (family, HFile) pairs were not loaded: " + list);
5208         return false;
5209       }
5210 
5211       
5212       
5213       
5214       
5215       
5216       if (assignSeqId) {
5217         FlushResult fs = flushcache(true, false);
5218         if (fs.isFlushSucceeded()) {
5219           seqId = ((FlushResultImpl)fs).flushSequenceId;
5220         } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
5221           seqId = ((FlushResultImpl)fs).flushSequenceId;
5222         } else {
5223           throw new IOException("Could not bulk load with an assigned sequential ID because the "+
5224             "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);
5225         }
5226       }
5227 
5228       for (Pair<byte[], String> p : familyPaths) {
5229         byte[] familyName = p.getFirst();
5230         String path = p.getSecond();
5231         Store store = getStore(familyName);
5232         try {
5233           String finalPath = path;
5234           if (bulkLoadListener != null) {
5235             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
5236           }
5237           Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
5238 
5239           if(storeFiles.containsKey(familyName)) {
5240             storeFiles.get(familyName).add(commitedStoreFile);
5241           } else {
5242             List<Path> storeFileNames = new ArrayList<Path>();
5243             storeFileNames.add(commitedStoreFile);
5244             storeFiles.put(familyName, storeFileNames);
5245           }
5246           if (bulkLoadListener != null) {
5247             bulkLoadListener.doneBulkLoad(familyName, path);
5248           }
5249         } catch (IOException ioe) {
5250           
5251           
5252 
5253           
5254           LOG.error("There was a partial failure due to IO when attempting to" +
5255               " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
5256           if (bulkLoadListener != null) {
5257             try {
5258               bulkLoadListener.failedBulkLoad(familyName, path);
5259             } catch (Exception ex) {
5260               LOG.error("Error while calling failedBulkLoad for family " +
5261                   Bytes.toString(familyName) + " with path " + path, ex);
5262             }
5263           }
5264           throw ioe;
5265         }
5266       }
5267 
5268       return true;
5269     } finally {
5270       if (wal != null && !storeFiles.isEmpty()) {
5271         
5272         try {
5273           WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
5274               this.getRegionInfo().getTable(),
5275               ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
5276           WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
5277               loadDescriptor, sequenceId);
5278         } catch (IOException ioe) {
5279           if (this.rsServices != null) {
5280             
5281             
5282             this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);
5283           }
5284         }
5285       }
5286 
5287       closeBulkRegionOperation();
5288     }
5289   }
5290 
5291   @Override
5292   public boolean equals(Object o) {
5293     return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),
5294                                                 ((HRegion) o).getRegionInfo().getRegionName());
5295   }
5296 
5297   @Override
5298   public int hashCode() {
5299     return Bytes.hashCode(getRegionInfo().getRegionName());
5300   }
5301 
5302   @Override
5303   public String toString() {
5304     return getRegionInfo().getRegionNameAsString();
5305   }
5306 
5307   
5308 
5309 
5310   class RegionScannerImpl implements RegionScanner {
5311     
5312     KeyValueHeap storeHeap = null;
5313     
5314 
5315     KeyValueHeap joinedHeap = null;
5316     
5317 
5318 
5319     protected Cell joinedContinuationRow = null;
5320     protected final byte[] stopRow;
5321     private final FilterWrapper filter;
5322     private ScannerContext defaultScannerContext;
5323     protected int isScan;
5324     private boolean filterClosed = false;
5325     private long readPt;
5326     private long maxResultSize;
5327     protected HRegion region;
5328 
5329     @Override
5330     public HRegionInfo getRegionInfo() {
5331       return region.getRegionInfo();
5332     }
5333 
5334     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
5335         throws IOException {
5336 
5337       this.region = region;
5338       this.maxResultSize = scan.getMaxResultSize();
5339       if (scan.hasFilter()) {
5340         this.filter = new FilterWrapper(scan.getFilter());
5341       } else {
5342         this.filter = null;
5343       }
5344 
5345       
5346 
5347 
5348 
5349 
5350       defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
5351 
5352       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
5353         this.stopRow = null;
5354       } else {
5355         this.stopRow = scan.getStopRow();
5356       }
5357       
5358       
5359       this.isScan = scan.isGetScan() ? -1 : 0;
5360 
5361       
5362       
5363       IsolationLevel isolationLevel = scan.getIsolationLevel();
5364       synchronized(scannerReadPoints) {
5365         this.readPt = getReadpoint(isolationLevel);
5366         scannerReadPoints.put(this, this.readPt);
5367       }
5368 
5369       
5370       
5371       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
5372       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
5373       
5374       List<KeyValueScanner> instantiatedScanners = new ArrayList<KeyValueScanner>();
5375       
5376       if (additionalScanners != null && !additionalScanners.isEmpty()) {
5377         scanners.addAll(additionalScanners);
5378         instantiatedScanners.addAll(additionalScanners);
5379       }
5380 
5381       try {
5382         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
5383           Store store = stores.get(entry.getKey());
5384           KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
5385           instantiatedScanners.add(scanner);
5386           if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
5387               || this.filter.isFamilyEssential(entry.getKey())) {
5388             scanners.add(scanner);
5389           } else {
5390             joinedScanners.add(scanner);
5391           }
5392         }
5393         initializeKVHeap(scanners, joinedScanners, region);
5394       } catch (Throwable t) {
5395         throw handleException(instantiatedScanners, t);
5396       }
5397     }
5398 
5399     protected void initializeKVHeap(List<KeyValueScanner> scanners,
5400         List<KeyValueScanner> joinedScanners, HRegion region)
5401         throws IOException {
5402       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
5403       if (!joinedScanners.isEmpty()) {
5404         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
5405       }
5406     }
5407 
5408     private IOException handleException(List<KeyValueScanner> instantiatedScanners,
5409         Throwable t) {
5410       
5411       scannerReadPoints.remove(this);
5412       if (storeHeap != null) {
5413         storeHeap.close();
5414         storeHeap = null;
5415         if (joinedHeap != null) {
5416           joinedHeap.close();
5417           joinedHeap = null;
5418         }
5419       } else {
5420         
5421         for (KeyValueScanner scanner : instantiatedScanners) {
5422           scanner.close();
5423         }
5424       }
5425       return t instanceof IOException ? (IOException) t : new IOException(t);
5426     }
5427 
5428     @Override
5429     public long getMaxResultSize() {
5430       return maxResultSize;
5431     }
5432 
5433     @Override
5434     public long getMvccReadPoint() {
5435       return this.readPt;
5436     }
5437 
5438     @Override
5439     public int getBatch() {
5440       return this.defaultScannerContext.getBatchLimit();
5441     }
5442 
5443     
5444 
5445 
5446 
5447 
5448     protected void resetFilters() throws IOException {
5449       if (filter != null) {
5450         filter.reset();
5451       }
5452     }
5453 
5454     @Override
5455     public boolean next(List<Cell> outResults)
5456         throws IOException {
5457       
5458       return next(outResults, defaultScannerContext);
5459     }
5460 
5461     @Override
5462     public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
5463       if (this.filterClosed) {
5464         throw new UnknownScannerException("Scanner was closed (timed out?) " +
5465             "after we renewed it. Could be caused by a very slow scanner " +
5466             "or a lengthy garbage collection");
5467       }
5468       startRegionOperation(Operation.SCAN);
5469       readRequestsCount.increment();
5470       try {
5471         return nextRaw(outResults, scannerContext);
5472       } finally {
5473         closeRegionOperation(Operation.SCAN);
5474       }
5475     }
5476 
5477     @Override
5478     public boolean nextRaw(List<Cell> outResults) throws IOException {
5479       
5480       return nextRaw(outResults, defaultScannerContext);
5481     }
5482 
5483     @Override
5484     public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
5485         throws IOException {
5486       if (storeHeap == null) {
5487         
5488         throw new UnknownScannerException("Scanner was closed");
5489       }
5490       boolean moreValues;
5491       if (outResults.isEmpty()) {
5492         
5493         
5494         moreValues = nextInternal(outResults, scannerContext);
5495       } else {
5496         List<Cell> tmpList = new ArrayList<Cell>();
5497         moreValues = nextInternal(tmpList, scannerContext);
5498         outResults.addAll(tmpList);
5499       }
5500 
5501       
5502       
5503       
5504       if (!scannerContext.midRowResultFormed()) resetFilters();
5505 
5506       if (isFilterDoneInternal()) {
5507         moreValues = false;
5508       }
5509       return moreValues;
5510     }
5511 
5512     
5513 
5514 
5515     private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
5516             throws IOException {
5517       assert joinedContinuationRow != null;
5518       boolean moreValues =
5519           populateResult(results, this.joinedHeap, scannerContext,
5520           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
5521           joinedContinuationRow.getRowLength());
5522 
5523       if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5524         
5525         joinedContinuationRow = null;
5526       }
5527       
5528       
5529       Collections.sort(results, comparator);
5530       return moreValues;
5531     }
5532 
5533     
5534 
5535 
5536 
5537 
5538 
5539 
5540 
5541 
5542 
5543     private boolean populateResult(List<Cell> results, KeyValueHeap heap,
5544         ScannerContext scannerContext, byte[] currentRow, int offset, short length)
5545         throws IOException {
5546       Cell nextKv;
5547       boolean moreCellsInRow = false;
5548       boolean tmpKeepProgress = scannerContext.getKeepProgress();
5549       
5550       LimitScope limitScope = LimitScope.BETWEEN_CELLS;
5551       do {
5552         
5553         
5554         
5555         scannerContext.setKeepProgress(true);
5556         heap.next(results, scannerContext);
5557         scannerContext.setKeepProgress(tmpKeepProgress);
5558 
5559         nextKv = heap.peek();
5560         moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
5561 
5562         if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
5563           return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
5564         } else if (scannerContext.checkSizeLimit(limitScope)) {
5565           ScannerContext.NextState state =
5566               moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
5567           return scannerContext.setScannerState(state).hasMoreValues();
5568         } else if (scannerContext.checkTimeLimit(limitScope)) {
5569           ScannerContext.NextState state =
5570               moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
5571           return scannerContext.setScannerState(state).hasMoreValues();
5572         }
5573       } while (moreCellsInRow);
5574 
5575       return nextKv != null;
5576     }
5577 
5578     
5579 
5580 
5581 
5582 
5583 
5584 
5585 
5586 
5587 
5588     private boolean moreCellsInRow(final Cell nextKv, byte[] currentRow, int offset,
5589         short length) {
5590       return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length);
5591     }
5592 
5593     
5594 
5595 
5596     @Override
5597     public synchronized boolean isFilterDone() throws IOException {
5598       return isFilterDoneInternal();
5599     }
5600 
5601     private boolean isFilterDoneInternal() throws IOException {
5602       return this.filter != null && this.filter.filterAllRemaining();
5603     }
5604 
5605     private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
5606         throws IOException {
5607       if (!results.isEmpty()) {
5608         throw new IllegalArgumentException("First parameter should be an empty list");
5609       }
5610       if (scannerContext == null) {
5611         throw new IllegalArgumentException("Scanner context cannot be null");
5612       }
5613       RpcCallContext rpcCall = RpcServer.getCurrentCall();
5614 
5615       
5616       
5617       
5618       int initialBatchProgress = scannerContext.getBatchProgress();
5619       long initialSizeProgress = scannerContext.getSizeProgress();
5620       long initialTimeProgress = scannerContext.getTimeProgress();
5621 
5622       
5623       
5624       
5625       
5626       
5627       while (true) {
5628         
5629         
5630         if (scannerContext.getKeepProgress()) {
5631           
5632           scannerContext
5633               .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress);
5634         } else {
5635           scannerContext.clearProgress();
5636         }
5637 
5638         if (rpcCall != null) {
5639           
5640           
5641           
5642           
5643           long afterTime = rpcCall.disconnectSince();
5644           if (afterTime >= 0) {
5645             throw new CallerDisconnectedException(
5646                 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " +
5647                     this + " after " + afterTime + " ms, since " +
5648                     "caller disconnected");
5649           }
5650         }
5651 
5652         
5653         Cell current = this.storeHeap.peek();
5654 
5655         byte[] currentRow = null;
5656         int offset = 0;
5657         short length = 0;
5658         if (current != null) {
5659           currentRow = current.getRowArray();
5660           offset = current.getRowOffset();
5661           length = current.getRowLength();
5662         }
5663 
5664         boolean stopRow = isStopRow(currentRow, offset, length);
5665         
5666         
5667         
5668         
5669         boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
5670 
5671         
5672         
5673         
5674         
5675         if (hasFilterRow) {
5676           if (LOG.isTraceEnabled()) {
5677             LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
5678                 + " formed. Changing scope of limits that may create partials");
5679           }
5680           scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
5681           scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
5682         }
5683 
5684         
5685         
5686         if (joinedContinuationRow == null) {
5687           
5688           if (stopRow) {
5689             if (hasFilterRow) {
5690               filter.filterRowCells(results);
5691             }
5692             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5693           }
5694 
5695           
5696           
5697           if (filterRowKey(currentRow, offset, length)) {
5698             boolean moreRows = !isFilterDoneInternal() && nextRow(currentRow, offset, length);
5699             if (!moreRows) {
5700               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5701             }
5702             results.clear();
5703             continue;
5704           }
5705 
5706           
5707           populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
5708 
5709           if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5710             if (hasFilterRow) {
5711               throw new IncompatibleFilterException(
5712                   "Filter whose hasFilterRow() returns true is incompatible with scans that must "
5713                       + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
5714             }
5715             return true;
5716           }
5717 
5718           Cell nextKv = this.storeHeap.peek();
5719           stopRow = nextKv == null ||
5720               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
5721           
5722           final boolean isEmptyRow = results.isEmpty();
5723 
5724           
5725           
5726           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
5727           if (hasFilterRow) {
5728             ret = filter.filterRowCellsWithRet(results);
5729 
5730             
5731             
5732             
5733             long timeProgress = scannerContext.getTimeProgress();
5734             if (scannerContext.getKeepProgress()) {
5735               scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
5736                 initialTimeProgress);
5737             } else {
5738               scannerContext.clearProgress();
5739             }
5740             scannerContext.setTimeProgress(timeProgress);
5741             scannerContext.incrementBatchProgress(results.size());
5742             for (Cell cell : results) {
5743               scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
5744             }
5745           }
5746 
5747           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
5748             results.clear();
5749             boolean moreRows = nextRow(currentRow, offset, length);
5750             if (!moreRows) {
5751               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5752             }
5753 
5754             
5755             
5756             if (!stopRow) continue;
5757             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5758           }
5759 
5760           
5761           
5762           
5763           
5764           if (this.joinedHeap != null) {
5765             boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
5766             if (mayHaveData) {
5767               joinedContinuationRow = current;
5768               populateFromJoinedHeap(results, scannerContext);
5769 
5770               if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5771                 return true;
5772               }
5773             }
5774           }
5775         } else {
5776           
5777           populateFromJoinedHeap(results, scannerContext);
5778           if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5779             return true;
5780           }
5781         }
5782         
5783         
5784         if (joinedContinuationRow != null) {
5785           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
5786         }
5787 
5788         
5789         
5790         
5791         if (results.isEmpty()) {
5792           boolean moreRows = nextRow(currentRow, offset, length);
5793           if (!moreRows) {
5794             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5795           }
5796           if (!stopRow) continue;
5797         }
5798 
5799         
5800         if (stopRow) {
5801           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5802         } else {
5803           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
5804         }
5805       }
5806     }
5807 
5808     
5809 
5810 
5811 
5812 
5813 
5814 
5815     private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
5816         throws IOException {
5817       Cell nextJoinedKv = joinedHeap.peek();
5818       boolean matchCurrentRow =
5819           nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
5820       boolean matchAfterSeek = false;
5821 
5822       
5823       
5824       if (!matchCurrentRow) {
5825         Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
5826         boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
5827         matchAfterSeek =
5828             seekSuccessful && joinedHeap.peek() != null
5829                 && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
5830       }
5831 
5832       return matchCurrentRow || matchAfterSeek;
5833     }
5834 
5835     
5836 
5837 
5838 
5839 
5840 
5841 
5842     private boolean filterRow() throws IOException {
5843       
5844       
5845       return filter != null && (!filter.hasFilterRow())
5846           && filter.filterRow();
5847     }
5848 
5849     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
5850       return filter != null
5851           && filter.filterRowKey(row, offset, length);
5852     }
5853 
5854     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
5855       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
5856       Cell next;
5857       while ((next = this.storeHeap.peek()) != null &&
5858              CellUtil.matchingRow(next, currentRow, offset, length)) {
5859         this.storeHeap.next(MOCKED_LIST);
5860       }
5861       resetFilters();
5862       
5863       return this.region.getCoprocessorHost() == null
5864           || this.region.getCoprocessorHost()
5865               .postScannerFilterRow(this, currentRow, offset, length);
5866     }
5867 
5868     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
5869       return currentRow == null ||
5870           (stopRow != null &&
5871           comparator.compareRows(stopRow, 0, stopRow.length,
5872             currentRow, offset, length) <= isScan);
5873     }
5874 
5875     @Override
5876     public synchronized void close() {
5877       if (storeHeap != null) {
5878         storeHeap.close();
5879         storeHeap = null;
5880       }
5881       if (joinedHeap != null) {
5882         joinedHeap.close();
5883         joinedHeap = null;
5884       }
5885       
5886       scannerReadPoints.remove(this);
5887       this.filterClosed = true;
5888     }
5889 
5890     KeyValueHeap getStoreHeapForTesting() {
5891       return storeHeap;
5892     }
5893 
5894     @Override
5895     public synchronized boolean reseek(byte[] row) throws IOException {
5896       if (row == null) {
5897         throw new IllegalArgumentException("Row cannot be null.");
5898       }
5899       boolean result = false;
5900       startRegionOperation();
5901       try {
5902         KeyValue kv = KeyValueUtil.createFirstOnRow(row);
5903         
5904         result = this.storeHeap.requestSeek(kv, true, true);
5905         if (this.joinedHeap != null) {
5906           result = this.joinedHeap.requestSeek(kv, true, true) || result;
5907         }
5908       } finally {
5909         closeRegionOperation();
5910       }
5911       return result;
5912     }
5913   }
5914 
5915   
5916   
5917 
5918 
5919 
5920 
5921 
5922 
5923 
5924 
5925 
5926 
5927 
5928 
5929 
5930 
5931 
5932 
5933 
5934   static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
5935       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
5936       RegionServerServices rsServices) {
5937     try {
5938       @SuppressWarnings("unchecked")
5939       Class<? extends HRegion> regionClass =
5940           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
5941 
5942       Constructor<? extends HRegion> c =
5943           regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
5944               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
5945               RegionServerServices.class);
5946 
5947       return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
5948     } catch (Throwable e) {
5949       
5950       throw new IllegalStateException("Could not instantiate a region instance.", e);
5951     }
5952   }
5953 
5954   
5955 
5956 
5957 
5958 
5959 
5960 
5961 
5962 
5963 
5964 
5965 
5966 
5967 
5968 
5969 
5970   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
5971       final Configuration conf, final HTableDescriptor hTableDescriptor)
5972   throws IOException {
5973     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
5974   }
5975 
5976   
5977 
5978 
5979 
5980 
5981 
5982 
5983 
5984 
5985 
5986   public static void closeHRegion(final HRegion r) throws IOException {
5987     if (r == null) return;
5988     r.close();
5989     if (r.getWAL() == null) return;
5990     r.getWAL().close();
5991   }
5992 
5993   
5994 
5995 
5996 
5997 
5998 
5999 
6000 
6001 
6002 
6003 
6004 
6005 
6006   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6007                                       final Configuration conf,
6008                                       final HTableDescriptor hTableDescriptor,
6009                                       final WAL wal,
6010                                       final boolean initialize)
6011       throws IOException {
6012     return createHRegion(info, rootDir, conf, hTableDescriptor,
6013         wal, initialize, false);
6014   }
6015 
6016   
6017 
6018 
6019 
6020 
6021 
6022 
6023 
6024 
6025 
6026 
6027 
6028 
6029 
6030   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6031                                       final Configuration conf,
6032                                       final HTableDescriptor hTableDescriptor,
6033                                       final WAL wal,
6034                                       final boolean initialize, final boolean ignoreWAL)
6035       throws IOException {
6036       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6037       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
6038           ignoreWAL);
6039   }
6040 
6041   
6042 
6043 
6044 
6045 
6046 
6047 
6048 
6049 
6050 
6051 
6052 
6053 
6054 
6055 
6056   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
6057                                       final Configuration conf,
6058                                       final HTableDescriptor hTableDescriptor,
6059                                       final WAL wal,
6060                                       final boolean initialize, final boolean ignoreWAL)
6061       throws IOException {
6062     LOG.info("creating HRegion " + info.getTable().getNameAsString()
6063         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
6064         " Table name == " + info.getTable().getNameAsString());
6065     FileSystem fs = FileSystem.get(conf);
6066     HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
6067     WAL effectiveWAL = wal;
6068     if (wal == null && !ignoreWAL) {
6069       
6070       
6071       
6072       Configuration confForWAL = new Configuration(conf);
6073       confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
6074       effectiveWAL = (new WALFactory(confForWAL,
6075           Collections.<WALActionsListener>singletonList(new MetricsWAL()),
6076           "hregion-" + RandomStringUtils.randomNumeric(8))).
6077             getWAL(info.getEncodedNameAsBytes());
6078     }
6079     HRegion region = HRegion.newHRegion(tableDir,
6080         effectiveWAL, fs, conf, info, hTableDescriptor, null);
6081     if (initialize) {
6082       
6083       
6084       region.setSequenceId(region.initialize(null));
6085     }
6086     return region;
6087   }
6088 
6089   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6090                                       final Configuration conf,
6091                                       final HTableDescriptor hTableDescriptor,
6092                                       final WAL wal)
6093     throws IOException {
6094     return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
6095   }
6096 
6097 
6098   
6099 
6100 
6101 
6102 
6103 
6104 
6105 
6106 
6107 
6108 
6109   public static HRegion openHRegion(final HRegionInfo info,
6110       final HTableDescriptor htd, final WAL wal,
6111       final Configuration conf)
6112   throws IOException {
6113     return openHRegion(info, htd, wal, conf, null, null);
6114   }
6115 
6116   
6117 
6118 
6119 
6120 
6121 
6122 
6123 
6124 
6125 
6126 
6127 
6128 
6129 
6130 
6131   public static HRegion openHRegion(final HRegionInfo info,
6132     final HTableDescriptor htd, final WAL wal, final Configuration conf,
6133     final RegionServerServices rsServices,
6134     final CancelableProgressable reporter)
6135   throws IOException {
6136     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
6137   }
6138 
6139   
6140 
6141 
6142 
6143 
6144 
6145 
6146 
6147 
6148 
6149 
6150 
6151 
6152   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
6153       final HTableDescriptor htd, final WAL wal, final Configuration conf)
6154   throws IOException {
6155     return openHRegion(rootDir, info, htd, wal, conf, null, null);
6156   }
6157 
6158   
6159 
6160 
6161 
6162 
6163 
6164 
6165 
6166 
6167 
6168 
6169 
6170 
6171 
6172 
6173   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
6174       final HTableDescriptor htd, final WAL wal, final Configuration conf,
6175       final RegionServerServices rsServices,
6176       final CancelableProgressable reporter)
6177   throws IOException {
6178     FileSystem fs = null;
6179     if (rsServices != null) {
6180       fs = rsServices.getFileSystem();
6181     }
6182     if (fs == null) {
6183       fs = FileSystem.get(conf);
6184     }
6185     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
6186   }
6187 
6188   
6189 
6190 
6191 
6192 
6193 
6194 
6195 
6196 
6197 
6198 
6199 
6200 
6201 
6202   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6203       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
6204       throws IOException {
6205     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
6206   }
6207 
6208   
6209 
6210 
6211 
6212 
6213 
6214 
6215 
6216 
6217 
6218 
6219 
6220 
6221 
6222 
6223 
6224   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6225       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
6226       final RegionServerServices rsServices, final CancelableProgressable reporter)
6227       throws IOException {
6228     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6229     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
6230   }
6231 
6232   
6233 
6234 
6235 
6236 
6237 
6238 
6239 
6240 
6241 
6242 
6243 
6244 
6245 
6246 
6247 
6248   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6249       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
6250       final WAL wal, final RegionServerServices rsServices,
6251       final CancelableProgressable reporter)
6252       throws IOException {
6253     if (info == null) throw new NullPointerException("Passed region info is null");
6254     if (LOG.isDebugEnabled()) {
6255       LOG.debug("Opening region: " + info);
6256     }
6257     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6258     return r.openHRegion(reporter);
6259   }
6260 
6261 
6262   
6263 
6264 
6265 
6266 
6267 
6268 
6269   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
6270       throws IOException {
6271     HRegionFileSystem regionFs = other.getRegionFileSystem();
6272     HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
6273         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
6274     return r.openHRegion(reporter);
6275   }
6276 
6277   public static Region openHRegion(final Region other, final CancelableProgressable reporter)
6278         throws IOException {
6279     return openHRegion((HRegion)other, reporter);
6280   }
6281 
6282   
6283 
6284 
6285 
6286 
6287 
6288   protected HRegion openHRegion(final CancelableProgressable reporter)
6289   throws IOException {
6290     
6291     checkCompressionCodecs();
6292     
6293     
6294     checkEncryption();
6295     
6296     checkClassLoading();
6297     this.openSeqNum = initialize(reporter);
6298     this.setSequenceId(openSeqNum);
6299     if (wal != null && getRegionServerServices() != null && !writestate.readOnly
6300         && !isRecovering) {
6301       
6302       
6303       
6304       writeRegionOpenMarker(wal, openSeqNum);
6305     }
6306     return this;
6307   }
6308 
6309   public static void warmupHRegion(final HRegionInfo info,
6310       final HTableDescriptor htd, final WAL wal, final Configuration conf,
6311       final RegionServerServices rsServices,
6312       final CancelableProgressable reporter)
6313       throws IOException {
6314 
6315     if (info == null) throw new NullPointerException("Passed region info is null");
6316 
6317     if (LOG.isDebugEnabled()) {
6318       LOG.debug("HRegion.Warming up region: " + info);
6319     }
6320 
6321     Path rootDir = FSUtils.getRootDir(conf);
6322     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6323 
6324     FileSystem fs = null;
6325     if (rsServices != null) {
6326       fs = rsServices.getFileSystem();
6327     }
6328     if (fs == null) {
6329       fs = FileSystem.get(conf);
6330     }
6331 
6332     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6333     r.initializeWarmup(reporter);
6334     r.close();
6335   }
6336 
6337 
6338   private void checkCompressionCodecs() throws IOException {
6339     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6340       CompressionTest.testCompression(fam.getCompression());
6341       CompressionTest.testCompression(fam.getCompactionCompression());
6342     }
6343   }
6344 
6345   private void checkEncryption() throws IOException {
6346     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6347       EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
6348     }
6349   }
6350 
6351   private void checkClassLoading() throws IOException {
6352     RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
6353     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
6354   }
6355 
6356   
6357 
6358 
6359 
6360 
6361   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
6362     
6363     fs.commitDaughterRegion(hri);
6364 
6365     
6366     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
6367         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
6368     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
6369     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
6370     return r;
6371   }
6372 
6373   
6374 
6375 
6376 
6377 
6378 
6379   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
6380       final HRegion region_b) throws IOException {
6381     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
6382         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
6383         this.getTableDesc(), this.rsServices);
6384     r.readRequestsCount.set(this.getReadRequestsCount()
6385         + region_b.getReadRequestsCount());
6386     r.writeRequestsCount.set(this.getWriteRequestsCount()
6387 
6388         + region_b.getWriteRequestsCount());
6389     this.fs.commitMergedRegion(mergedRegionInfo);
6390     return r;
6391   }
6392 
6393   
6394 
6395 
6396 
6397 
6398 
6399 
6400 
6401 
6402 
6403   
6404   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
6405     meta.checkResources();
6406     
6407     byte[] row = r.getRegionInfo().getRegionName();
6408     final long now = EnvironmentEdgeManager.currentTime();
6409     final List<Cell> cells = new ArrayList<Cell>(2);
6410     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6411       HConstants.REGIONINFO_QUALIFIER, now,
6412       r.getRegionInfo().toByteArray()));
6413     
6414     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6415       HConstants.META_VERSION_QUALIFIER, now,
6416       Bytes.toBytes(HConstants.META_VERSION)));
6417     meta.put(row, HConstants.CATALOG_FAMILY, cells);
6418   }
6419 
6420   
6421 
6422 
6423 
6424 
6425 
6426 
6427   @Deprecated
6428   public static Path getRegionDir(final Path tabledir, final String name) {
6429     return new Path(tabledir, name);
6430   }
6431 
6432   
6433 
6434 
6435 
6436 
6437 
6438 
6439   @Deprecated
6440   @VisibleForTesting
6441   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
6442     return new Path(
6443       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
6444   }
6445 
6446   
6447 
6448 
6449 
6450 
6451 
6452 
6453 
6454   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
6455     return ((info.getStartKey().length == 0) ||
6456         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
6457         ((info.getEndKey().length == 0) ||
6458             (Bytes.compareTo(info.getEndKey(), row) > 0));
6459   }
6460 
6461   
6462 
6463 
6464 
6465 
6466 
6467   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
6468   throws IOException {
6469     HRegion a = srcA;
6470     HRegion b = srcB;
6471 
6472     
6473     
6474     if (srcA.getRegionInfo().getStartKey() == null) {
6475       if (srcB.getRegionInfo().getStartKey() == null) {
6476         throw new IOException("Cannot merge two regions with null start key");
6477       }
6478       
6479     } else if ((srcB.getRegionInfo().getStartKey() == null) ||
6480       (Bytes.compareTo(srcA.getRegionInfo().getStartKey(),
6481         srcB.getRegionInfo().getStartKey()) > 0)) {
6482       a = srcB;
6483       b = srcA;
6484     }
6485 
6486     if (!(Bytes.compareTo(a.getRegionInfo().getEndKey(),
6487         b.getRegionInfo().getStartKey()) == 0)) {
6488       throw new IOException("Cannot merge non-adjacent regions");
6489     }
6490     return merge(a, b);
6491   }
6492 
6493   
6494 
6495 
6496 
6497 
6498 
6499 
6500 
6501   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
6502     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
6503       throw new IOException("Regions do not belong to the same table");
6504     }
6505 
6506     FileSystem fs = a.getRegionFileSystem().getFileSystem();
6507     
6508     a.flush(true);
6509     b.flush(true);
6510 
6511     
6512     a.compact(true);
6513     if (LOG.isDebugEnabled()) {
6514       LOG.debug("Files for region: " + a);
6515       a.getRegionFileSystem().logFileSystemState(LOG);
6516     }
6517     b.compact(true);
6518     if (LOG.isDebugEnabled()) {
6519       LOG.debug("Files for region: " + b);
6520       b.getRegionFileSystem().logFileSystemState(LOG);
6521     }
6522 
6523     RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, true);
6524     if (!rmt.prepare(null)) {
6525       throw new IOException("Unable to merge regions " + a + " and " + b);
6526     }
6527     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
6528     LOG.info("starting merge of regions: " + a + " and " + b
6529         + " into new region " + mergedRegionInfo.getRegionNameAsString()
6530         + " with start key <"
6531         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
6532         + "> and end key <"
6533         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
6534     HRegion dstRegion;
6535     try {
6536       dstRegion = (HRegion)rmt.execute(null, null);
6537     } catch (IOException ioe) {
6538       rmt.rollback(null, null);
6539       throw new IOException("Failed merging region " + a + " and " + b
6540           + ", and successfully rolled back");
6541     }
6542     dstRegion.compact(true);
6543 
6544     if (LOG.isDebugEnabled()) {
6545       LOG.debug("Files for new region");
6546       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
6547     }
6548 
6549     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
6550       throw new IOException("Merged region " + dstRegion
6551           + " still has references after the compaction, is compaction canceled?");
6552     }
6553 
6554     
6555     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
6556     
6557     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
6558 
6559     LOG.info("merge completed. New region is " + dstRegion);
6560     return dstRegion;
6561   }
6562 
6563   @Override
6564   public Result get(final Get get) throws IOException {
6565     checkRow(get.getRow(), "Get");
6566     
6567     if (get.hasFamilies()) {
6568       for (byte [] family: get.familySet()) {
6569         checkFamily(family);
6570       }
6571     } else { 
6572       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
6573         get.addFamily(family);
6574       }
6575     }
6576     List<Cell> results = get(get, true);
6577     boolean stale = this.getRegionInfo().getReplicaId() != 0;
6578     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
6579   }
6580 
6581   @Override
6582   public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
6583 
6584     List<Cell> results = new ArrayList<Cell>();
6585 
6586     
6587     if (withCoprocessor && (coprocessorHost != null)) {
6588        if (coprocessorHost.preGet(get, results)) {
6589          return results;
6590        }
6591     }
6592 
6593     Scan scan = new Scan(get);
6594 
6595     RegionScanner scanner = null;
6596     try {
6597       scanner = getScanner(scan);
6598       scanner.next(results);
6599     } finally {
6600       if (scanner != null)
6601         scanner.close();
6602     }
6603 
6604     
6605     if (withCoprocessor && (coprocessorHost != null)) {
6606       coprocessorHost.postGet(get, results);
6607     }
6608 
6609     
6610     if (this.metricsRegion != null) {
6611       long totalSize = 0L;
6612       for (Cell cell : results) {
6613         totalSize += CellUtil.estimatedSerializedSizeOf(cell);
6614       }
6615       this.metricsRegion.updateGet(totalSize);
6616     }
6617 
6618     return results;
6619   }
6620 
6621   @Override
6622   public void mutateRow(RowMutations rm) throws IOException {
6623     
6624     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
6625   }
6626 
6627   
6628 
6629 
6630 
6631   public void mutateRowsWithLocks(Collection<Mutation> mutations,
6632       Collection<byte[]> rowsToLock) throws IOException {
6633     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
6634   }
6635 
6636   
6637 
6638 
6639 
6640 
6641 
6642 
6643 
6644 
6645 
6646 
6647 
6648   @Override
6649   public void mutateRowsWithLocks(Collection<Mutation> mutations,
6650       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
6651     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
6652     processRowsWithLocks(proc, -1, nonceGroup, nonce);
6653   }
6654 
6655   
6656 
6657 
6658   public ClientProtos.RegionLoadStats getRegionStats() {
6659     if (!regionStatsEnabled) {
6660       return null;
6661     }
6662     ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
6663     stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
6664         .memstoreFlushSize)));
6665     stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
6666     return stats.build();
6667   }
6668 
6669   @Override
6670   public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
6671     processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE,
6672       HConstants.NO_NONCE);
6673   }
6674 
6675   @Override
6676   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
6677       throws IOException {
6678     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
6679   }
6680 
6681   @Override
6682   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
6683       long nonceGroup, long nonce) throws IOException {
6684 
6685     for (byte[] row : processor.getRowsToLock()) {
6686       checkRow(row, "processRowsWithLocks");
6687     }
6688     if (!processor.readOnly()) {
6689       checkReadOnly();
6690     }
6691     checkResources();
6692 
6693     startRegionOperation();
6694     WALEdit walEdit = new WALEdit();
6695 
6696     
6697     try {
6698       processor.preProcess(this, walEdit);
6699     } catch (IOException e) {
6700       closeRegionOperation();
6701       throw e;
6702     }
6703     
6704     if (processor.readOnly()) {
6705       try {
6706         long now = EnvironmentEdgeManager.currentTime();
6707         doProcessRowWithTimeout(
6708             processor, now, this, null, null, timeout);
6709         processor.postProcess(this, walEdit, true);
6710       } finally {
6711         closeRegionOperation();
6712       }
6713       return;
6714     }
6715 
6716     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
6717     boolean locked;
6718     boolean walSyncSuccessful = false;
6719     List<RowLock> acquiredRowLocks;
6720     long addedSize = 0;
6721     List<Mutation> mutations = new ArrayList<Mutation>();
6722     List<Cell> memstoreCells = new ArrayList<Cell>();
6723     Collection<byte[]> rowsToLock = processor.getRowsToLock();
6724     long mvccNum = 0;
6725     WALKey walKey = null;
6726     try {
6727       
6728       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
6729       for (byte[] row : rowsToLock) {
6730         
6731         acquiredRowLocks.add(getRowLock(row));
6732       }
6733       
6734       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
6735       locked = true;
6736       
6737       mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
6738 
6739       long now = EnvironmentEdgeManager.currentTime();
6740       try {
6741         
6742         
6743         doProcessRowWithTimeout(
6744             processor, now, this, mutations, walEdit, timeout);
6745 
6746         if (!mutations.isEmpty()) {
6747           
6748           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
6749           
6750           processor.preBatchMutate(this, walEdit);
6751           
6752           for (Mutation m : mutations) {
6753             
6754             rewriteCellTags(m.getFamilyCellMap(), m);
6755 
6756             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
6757               Cell cell = cellScanner.current();
6758               CellUtil.setSequenceId(cell, mvccNum);
6759               Store store = getStore(cell);
6760               if (store == null) {
6761                 checkFamily(CellUtil.cloneFamily(cell));
6762                 
6763               }
6764               Pair<Long, Cell> ret = store.add(cell);
6765               addedSize += ret.getFirst();
6766               memstoreCells.add(ret.getSecond());
6767             }
6768           }
6769 
6770           long txid = 0;
6771           
6772           if (!walEdit.isEmpty()) {
6773             
6774             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
6775               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
6776               processor.getClusterIds(), nonceGroup, nonce);
6777             txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
6778               walKey, walEdit, getSequenceId(), true, memstoreCells);
6779           }
6780           if(walKey == null){
6781             
6782             
6783             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
6784           }
6785           
6786           if (locked) {
6787             this.updatesLock.readLock().unlock();
6788             locked = false;
6789           }
6790 
6791           
6792           releaseRowLocks(acquiredRowLocks);
6793 
6794           
6795           if (txid != 0) {
6796             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
6797           }
6798           walSyncSuccessful = true;
6799           
6800           processor.postBatchMutate(this);
6801         }
6802       } finally {
6803         
6804         
6805         
6806         if (!mutations.isEmpty() && !walSyncSuccessful) {
6807           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
6808               " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
6809               processor.getRowsToLock().iterator().next()) + "...");
6810           for (Mutation m : mutations) {
6811             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
6812               Cell cell = cellScanner.current();
6813               getStore(cell).rollback(cell);
6814             }
6815           }
6816           if (writeEntry != null) {
6817             mvcc.cancelMemstoreInsert(writeEntry);
6818             writeEntry = null;
6819           }
6820         }
6821         
6822         if (writeEntry != null) {
6823           mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
6824         }
6825         if (locked) {
6826           this.updatesLock.readLock().unlock();
6827         }
6828         
6829         releaseRowLocks(acquiredRowLocks);
6830       }
6831 
6832       
6833       processor.postProcess(this, walEdit, walSyncSuccessful);
6834 
6835     } finally {
6836       closeRegionOperation();
6837       if (!mutations.isEmpty() &&
6838           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
6839         requestFlush();
6840       }
6841     }
6842   }
6843 
6844   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
6845                                        final long now,
6846                                        final HRegion region,
6847                                        final List<Mutation> mutations,
6848                                        final WALEdit walEdit,
6849                                        final long timeout) throws IOException {
6850     
6851     if (timeout < 0) {
6852       try {
6853         processor.process(now, region, mutations, walEdit);
6854       } catch (IOException e) {
6855         LOG.warn("RowProcessor:" + processor.getClass().getName() +
6856             " throws Exception on row(s):" +
6857             Bytes.toStringBinary(
6858               processor.getRowsToLock().iterator().next()) + "...", e);
6859         throw e;
6860       }
6861       return;
6862     }
6863 
6864     
6865     FutureTask<Void> task =
6866       new FutureTask<Void>(new Callable<Void>() {
6867         @Override
6868         public Void call() throws IOException {
6869           try {
6870             processor.process(now, region, mutations, walEdit);
6871             return null;
6872           } catch (IOException e) {
6873             LOG.warn("RowProcessor:" + processor.getClass().getName() +
6874                 " throws Exception on row(s):" +
6875                 Bytes.toStringBinary(
6876                     processor.getRowsToLock().iterator().next()) + "...", e);
6877             throw e;
6878           }
6879         }
6880       });
6881     rowProcessorExecutor.execute(task);
6882     try {
6883       task.get(timeout, TimeUnit.MILLISECONDS);
6884     } catch (TimeoutException te) {
6885       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
6886           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
6887           "...");
6888       throw new IOException(te);
6889     } catch (Exception e) {
6890       throw new IOException(e);
6891     }
6892   }
6893 
6894   public Result append(Append append) throws IOException {
6895     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
6896   }
6897 
6898   
6899   
6900   
6901 
6902   @Override
6903   public Result append(Append append, long nonceGroup, long nonce) throws IOException {
6904     byte[] row = append.getRow();
6905     checkRow(row, "append");
6906     boolean flush = false;
6907     Durability durability = getEffectiveDurability(append.getDurability());
6908     boolean writeToWAL = durability != Durability.SKIP_WAL;
6909     WALEdit walEdits = null;
6910     List<Cell> allKVs = new ArrayList<Cell>(append.size());
6911     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
6912     long size = 0;
6913     long txid = 0;
6914 
6915     checkReadOnly();
6916     checkResources();
6917     
6918     startRegionOperation(Operation.APPEND);
6919     this.writeRequestsCount.increment();
6920     long mvccNum = 0;
6921     WriteEntry writeEntry = null;
6922     WALKey walKey = null;
6923     RowLock rowLock = null;
6924     List<Cell> memstoreCells = new ArrayList<Cell>();
6925     boolean doRollBackMemstore = false;
6926     try {
6927       rowLock = getRowLock(row);
6928       try {
6929         lock(this.updatesLock.readLock());
6930         try {
6931           
6932           
6933           mvcc.waitForPreviousTransactionsComplete();
6934           if (this.coprocessorHost != null) {
6935             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
6936             if(r!= null) {
6937               return r;
6938             }
6939           }
6940           
6941           mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
6942           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
6943           long now = EnvironmentEdgeManager.currentTime();
6944           
6945           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
6946 
6947             Store store = stores.get(family.getKey());
6948             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
6949 
6950             
6951             
6952             
6953             
6954             Collections.sort(family.getValue(), store.getComparator());
6955             
6956             Get get = new Get(row);
6957             for (Cell cell : family.getValue()) {
6958               get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
6959             }
6960             List<Cell> results = get(get, false);
6961 
6962             
6963             
6964 
6965             
6966             
6967             
6968             int idx = 0;
6969             for (Cell cell : family.getValue()) {
6970               Cell newCell;
6971               Cell oldCell = null;
6972               if (idx < results.size()
6973                   && CellUtil.matchingQualifier(results.get(idx), cell)) {
6974                 oldCell = results.get(idx);
6975                 long ts = Math.max(now, oldCell.getTimestamp());
6976 
6977                 
6978                 List<Tag> tags = Tag.carryForwardTags(null, oldCell);
6979                 tags = Tag.carryForwardTags(tags, cell);
6980                 tags = carryForwardTTLTag(tags, append);
6981 
6982                 
6983                 byte[] tagBytes = Tag.fromList(tags);
6984 
6985                 
6986                 newCell = new KeyValue(row.length, cell.getFamilyLength(),
6987                     cell.getQualifierLength(), ts, KeyValue.Type.Put,
6988                     oldCell.getValueLength() + cell.getValueLength(),
6989                     tagBytes == null? 0: tagBytes.length);
6990                 
6991                 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
6992                   newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
6993                 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
6994                   newCell.getFamilyArray(), newCell.getFamilyOffset(),
6995                   cell.getFamilyLength());
6996                 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
6997                   newCell.getQualifierArray(), newCell.getQualifierOffset(),
6998                   cell.getQualifierLength());
6999                 
7000                 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
7001                   newCell.getValueArray(), newCell.getValueOffset(),
7002                   oldCell.getValueLength());
7003                 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
7004                   newCell.getValueArray(),
7005                   newCell.getValueOffset() + oldCell.getValueLength(),
7006                   cell.getValueLength());
7007                 
7008                 if (tagBytes != null) {
7009                   System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
7010                     tagBytes.length);
7011                 }
7012                 idx++;
7013               } else {
7014                 
7015                 CellUtil.updateLatestStamp(cell, now);
7016 
7017                 
7018 
7019                 if (append.getTTL() != Long.MAX_VALUE) {
7020                   
7021                   newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
7022                       cell.getRowLength(),
7023                     cell.getFamilyArray(), cell.getFamilyOffset(),
7024                       cell.getFamilyLength(),
7025                     cell.getQualifierArray(), cell.getQualifierOffset(),
7026                       cell.getQualifierLength(),
7027                     cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
7028                     cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
7029                     carryForwardTTLTag(append));
7030                 } else {
7031                   newCell = cell;
7032                 }
7033               }
7034 
7035               CellUtil.setSequenceId(newCell, mvccNum);
7036               
7037               if (coprocessorHost != null) {
7038                 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
7039                     append, oldCell, newCell);
7040               }
7041               kvs.add(newCell);
7042 
7043               
7044               if (writeToWAL) {
7045                 if (walEdits == null) {
7046                   walEdits = new WALEdit();
7047                 }
7048                 walEdits.add(newCell);
7049               }
7050             }
7051 
7052             
7053             tempMemstore.put(store, kvs);
7054           }
7055 
7056           
7057           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
7058             Store store = entry.getKey();
7059             if (store.getFamily().getMaxVersions() == 1) {
7060               
7061               size += store.upsert(entry.getValue(), getSmallestReadPoint());
7062               memstoreCells.addAll(entry.getValue());
7063             } else {
7064               
7065               for (Cell cell: entry.getValue()) {
7066                 Pair<Long, Cell> ret = store.add(cell);
7067                 size += ret.getFirst();
7068                 memstoreCells.add(ret.getSecond());
7069                 doRollBackMemstore = true;
7070               }
7071             }
7072             allKVs.addAll(entry.getValue());
7073           }
7074 
7075           
7076           if (writeToWAL) {
7077             
7078             
7079             
7080             
7081             walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
7082               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
7083             txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
7084               this.sequenceId, true, memstoreCells);
7085           } else {
7086             recordMutationWithoutWal(append.getFamilyCellMap());
7087           }
7088           if (walKey == null) {
7089             
7090             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
7091           }
7092           size = this.addAndGetGlobalMemstoreSize(size);
7093           flush = isFlushSize(size);
7094         } finally {
7095           this.updatesLock.readLock().unlock();
7096         }
7097       } finally {
7098         rowLock.release();
7099         rowLock = null;
7100       }
7101       
7102       if(txid != 0){
7103         syncOrDefer(txid, durability);
7104       }
7105       doRollBackMemstore = false;
7106     } finally {
7107       if (rowLock != null) {
7108         rowLock.release();
7109       }
7110       
7111       if (doRollBackMemstore) {
7112         rollbackMemstore(memstoreCells);
7113         if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
7114       } else if (writeEntry != null) {
7115         mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
7116       }
7117 
7118       closeRegionOperation(Operation.APPEND);
7119     }
7120 
7121     if (this.metricsRegion != null) {
7122       this.metricsRegion.updateAppend();
7123     }
7124 
7125     if (flush) {
7126       
7127       requestFlush();
7128     }
7129 
7130 
7131     return append.isReturnResults() ? Result.create(allKVs) : null;
7132   }
7133 
7134   public Result increment(Increment increment) throws IOException {
7135     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
7136   }
7137 
7138   
7139   
7140   
7141 
7142   @Override
7143   public Result increment(Increment increment, long nonceGroup, long nonce)
7144   throws IOException {
7145     checkReadOnly();
7146     checkResources();
7147     checkRow(increment.getRow(), "increment");
7148     checkFamilies(increment.getFamilyCellMap().keySet());
7149     startRegionOperation(Operation.INCREMENT);
7150     this.writeRequestsCount.increment();
7151     try {
7152       return doIncrement(increment, nonceGroup, nonce);
7153     } finally {
7154       if (this.metricsRegion != null) this.metricsRegion.updateIncrement();
7155       closeRegionOperation(Operation.INCREMENT);
7156     }
7157   }
7158 
7159   private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException {
7160     RowLock rowLock = null;
7161     WriteEntry writeEntry = null;
7162     WALKey walKey = null;
7163     boolean doRollBackMemstore = false;
7164     long accumulatedResultSize = 0;
7165     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
7166     List<Cell> memstoreCells = new ArrayList<Cell>();
7167     Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
7168     try {
7169       rowLock = getRowLock(increment.getRow());
7170       long txid = 0;
7171       try {
7172         lock(this.updatesLock.readLock());
7173         try {
7174           
7175           
7176           this.mvcc.waitForPreviousTransactionsComplete();
7177           if (this.coprocessorHost != null) {
7178             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
7179             if (r != null) return r;
7180           }
7181           
7182           long mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
7183           writeEntry = this.mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
7184 
7185           
7186           long now = EnvironmentEdgeManager.currentTime();
7187           final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
7188           WALEdit walEdits = null;
7189           for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
7190             byte [] columnFamilyName = entry.getKey();
7191             List<Cell> increments = entry.getValue();
7192             Store store = this.stores.get(columnFamilyName);
7193             
7194             
7195             List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName,
7196                 sort(increments, store.getComparator()), now, mvccNum, allKVs, null);
7197             if (!results.isEmpty()) {
7198               
7199               if (writeToWAL) {
7200                 
7201                 
7202                 int resultsSize = results.size();
7203                 for (int i = 0; i < resultsSize; i++) {
7204                   if (walEdits == null) walEdits = new WALEdit();
7205                   walEdits.add(results.get(i));
7206                 }
7207               }
7208               
7209               if (store.getFamily().getMaxVersions() == 1) {
7210                 
7211                 accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
7212                 memstoreCells.addAll(results);
7213                 
7214               } else {
7215                 
7216                 for (Cell cell: results) {
7217                   Pair<Long, Cell> ret = store.add(cell);
7218                   accumulatedResultSize += ret.getFirst();
7219                   memstoreCells.add(ret.getSecond());
7220                   doRollBackMemstore = true;
7221                 }
7222               }
7223             }
7224           }
7225 
7226           
7227           if (walEdits != null && !walEdits.isEmpty()) {
7228             if (writeToWAL) {
7229               
7230               
7231               
7232               walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
7233                 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
7234               txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
7235                 walKey, walEdits, getSequenceId(), true, memstoreCells);
7236             } else {
7237               recordMutationWithoutWal(increment.getFamilyCellMap());
7238             }
7239           }
7240           if (walKey == null) {
7241             
7242             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
7243           }
7244         } finally {
7245           this.updatesLock.readLock().unlock();
7246         }
7247       } finally {
7248         rowLock.release();
7249         rowLock = null;
7250       }
7251       
7252       if (txid != 0) syncOrDefer(txid, effectiveDurability);
7253       doRollBackMemstore = false;
7254     } finally {
7255       if (rowLock != null) rowLock.release();
7256       
7257       if (doRollBackMemstore) rollbackMemstore(memstoreCells);
7258       if (writeEntry != null) mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
7259     }
7260     
7261     if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
7262     return increment.isReturnResults() ? Result.create(allKVs) : null;
7263   }
7264 
7265   
7266 
7267 
7268   private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {
7269     Collections.sort(cells, comparator);
7270     return cells;
7271   }
7272 
7273   
7274 
7275 
7276 
7277 
7278 
7279 
7280 
7281 
7282 
7283 
7284   private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName,
7285       List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs,
7286       final IsolationLevel isolation)
7287   throws IOException {
7288     List<Cell> results = new ArrayList<Cell>(sortedIncrements.size());
7289     byte [] row = increment.getRow();
7290     
7291     List<Cell> currentValues =
7292         getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation);
7293     
7294     
7295     int idx = 0;
7296     for (int i = 0; i < sortedIncrements.size(); i++) {
7297       Cell inc = sortedIncrements.get(i);
7298       long incrementAmount = getLongValue(inc);
7299       
7300       boolean writeBack = (incrementAmount != 0);
7301       
7302       List<Tag> tags = Tag.carryForwardTags(inc);
7303 
7304       Cell currentValue = null;
7305       long ts = now;
7306       if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) {
7307         currentValue = currentValues.get(idx);
7308         ts = Math.max(now, currentValue.getTimestamp());
7309         incrementAmount += getLongValue(currentValue);
7310         
7311         tags = Tag.carryForwardTags(tags, currentValue);
7312         if (i < (sortedIncrements.size() - 1) &&
7313             !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++;
7314       }
7315 
7316       
7317       byte [] qualifier = CellUtil.cloneQualifier(inc);
7318       byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount);
7319       tags = carryForwardTTLTag(tags, increment);
7320 
7321       Cell newValue = new KeyValue(row, 0, row.length,
7322         columnFamilyName, 0, columnFamilyName.length,
7323         qualifier, 0, qualifier.length,
7324         ts, KeyValue.Type.Put,
7325         incrementAmountInBytes, 0, incrementAmountInBytes.length,
7326         tags);
7327 
7328       
7329       
7330       if (mvccNum != MultiVersionConsistencyControl.NO_WRITE_NUMBER) {
7331         CellUtil.setSequenceId(newValue, mvccNum);
7332       }
7333 
7334       
7335       if (coprocessorHost != null) {
7336         newValue = coprocessorHost.postMutationBeforeWAL(
7337             RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue);
7338       }
7339       allKVs.add(newValue);
7340       if (writeBack) {
7341         results.add(newValue);
7342       }
7343     }
7344     return results;
7345   }
7346 
7347   
7348 
7349 
7350 
7351   private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
7352     int len = cell.getValueLength();
7353     if (len != Bytes.SIZEOF_LONG) {
7354       
7355       throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
7356     }
7357     return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len);
7358   }
7359 
7360   
7361 
7362 
7363 
7364 
7365 
7366 
7367 
7368 
7369   private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily,
7370       final List<Cell> increments, final IsolationLevel isolation)
7371   throws IOException {
7372     Get get = new Get(increment.getRow());
7373     if (isolation != null) get.setIsolationLevel(isolation);
7374     for (Cell cell: increments) {
7375       get.addColumn(columnFamily, CellUtil.cloneQualifier(cell));
7376     }
7377     TimeRange tr = increment.getTimeRange();
7378     get.setTimeRange(tr.getMin(), tr.getMax());
7379     return get(get, false);
7380   }
7381 
7382   private static List<Tag> carryForwardTTLTag(final Mutation mutation) {
7383     return carryForwardTTLTag(null, mutation);
7384   }
7385 
7386   
7387 
7388 
7389   private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull,
7390       final Mutation mutation) {
7391     long ttl = mutation.getTTL();
7392     if (ttl == Long.MAX_VALUE) return tagsOrNull;
7393     List<Tag> tags = tagsOrNull;
7394     
7395     
7396     
7397     if (tags == null) tags = new ArrayList<Tag>(1);
7398     tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
7399     return tags;
7400   }
7401 
7402   
7403   
7404   
7405 
7406   private void checkFamily(final byte [] family)
7407   throws NoSuchColumnFamilyException {
7408     if (!this.htableDescriptor.hasFamily(family)) {
7409       throw new NoSuchColumnFamilyException("Column family " +
7410           Bytes.toString(family) + " does not exist in region " + this
7411           + " in table " + this.htableDescriptor);
7412     }
7413   }
7414 
7415   public static final long FIXED_OVERHEAD = ClassSize.align(
7416       ClassSize.OBJECT +
7417       ClassSize.ARRAY +
7418       45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
7419       (14 * Bytes.SIZEOF_LONG) +
7420       5 * Bytes.SIZEOF_BOOLEAN);
7421 
7422   
7423   
7424   
7425   
7426   
7427   
7428   
7429   
7430   
7431   
7432   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
7433       ClassSize.OBJECT + 
7434       (2 * ClassSize.ATOMIC_BOOLEAN) + 
7435       (3 * ClassSize.ATOMIC_LONG) + 
7436       (2 * ClassSize.CONCURRENT_HASHMAP) +  
7437       WriteState.HEAP_SIZE + 
7438       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + 
7439       (2 * ClassSize.REENTRANT_LOCK) + 
7440       MultiVersionConsistencyControl.FIXED_SIZE 
7441       + ClassSize.TREEMAP 
7442       + 2 * ClassSize.ATOMIC_INTEGER 
7443       ;
7444 
7445   @Override
7446   public long heapSize() {
7447     long heapSize = DEEP_OVERHEAD;
7448     for (Store store : this.stores.values()) {
7449       heapSize += store.heapSize();
7450     }
7451     
7452     return heapSize;
7453   }
7454 
7455   
7456 
7457 
7458 
7459   private static void printUsageAndExit(final String message) {
7460     if (message != null && message.length() > 0) System.out.println(message);
7461     System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
7462     System.out.println("Options:");
7463     System.out.println(" major_compact  Pass this option to major compact " +
7464       "passed region.");
7465     System.out.println("Default outputs scan of passed region.");
7466     System.exit(1);
7467   }
7468 
7469   @Override
7470   public boolean registerService(Service instance) {
7471     
7472 
7473 
7474     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
7475     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
7476       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
7477           " already registered, rejecting request from "+instance
7478       );
7479       return false;
7480     }
7481 
7482     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
7483     if (LOG.isDebugEnabled()) {
7484       LOG.debug("Registered coprocessor service: region=" +
7485           Bytes.toStringBinary(getRegionInfo().getRegionName()) +
7486           " service=" + serviceDesc.getFullName());
7487     }
7488     return true;
7489   }
7490 
7491   @Override
7492   public Message execService(RpcController controller, CoprocessorServiceCall call)
7493       throws IOException {
7494     String serviceName = call.getServiceName();
7495     String methodName = call.getMethodName();
7496     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
7497       throw new UnknownProtocolException(null,
7498           "No registered coprocessor service found for name "+serviceName+
7499           " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7500     }
7501 
7502     Service service = coprocessorServiceHandlers.get(serviceName);
7503     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
7504     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
7505     if (methodDesc == null) {
7506       throw new UnknownProtocolException(service.getClass(),
7507           "Unknown method "+methodName+" called on service "+serviceName+
7508               " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7509     }
7510 
7511     Message.Builder builder = service.getRequestPrototype(methodDesc).newBuilderForType();
7512     ProtobufUtil.mergeFrom(builder, call.getRequest());
7513     Message request = builder.build();
7514 
7515     if (coprocessorHost != null) {
7516       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
7517     }
7518 
7519     final Message.Builder responseBuilder =
7520         service.getResponsePrototype(methodDesc).newBuilderForType();
7521     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
7522       @Override
7523       public void run(Message message) {
7524         if (message != null) {
7525           responseBuilder.mergeFrom(message);
7526         }
7527       }
7528     });
7529 
7530     if (coprocessorHost != null) {
7531       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
7532     }
7533 
7534     return responseBuilder.build();
7535   }
7536 
7537   
7538 
7539 
7540 
7541 
7542   private static void processTable(final FileSystem fs, final Path p,
7543       final WALFactory walFactory, final Configuration c,
7544       final boolean majorCompact)
7545   throws IOException {
7546     HRegion region;
7547     FSTableDescriptors fst = new FSTableDescriptors(c);
7548     
7549     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
7550       final WAL wal = walFactory.getMetaWAL(
7551           HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
7552       region = HRegion.newHRegion(p, wal, fs, c,
7553         HRegionInfo.FIRST_META_REGIONINFO, fst.get(TableName.META_TABLE_NAME), null);
7554     } else {
7555       throw new IOException("Not a known catalog table: " + p.toString());
7556     }
7557     try {
7558       region.initialize(null);
7559       if (majorCompact) {
7560         region.compact(true);
7561       } else {
7562         
7563         Scan scan = new Scan();
7564         
7565         RegionScanner scanner = region.getScanner(scan);
7566         try {
7567           List<Cell> kvs = new ArrayList<Cell>();
7568           boolean done;
7569           do {
7570             kvs.clear();
7571             done = scanner.next(kvs);
7572             if (kvs.size() > 0) LOG.info(kvs);
7573           } while (done);
7574         } finally {
7575           scanner.close();
7576         }
7577       }
7578     } finally {
7579       region.close();
7580     }
7581   }
7582 
7583   boolean shouldForceSplit() {
7584     return this.splitRequest;
7585   }
7586 
7587   byte[] getExplicitSplitPoint() {
7588     return this.explicitSplitPoint;
7589   }
7590 
7591   void forceSplit(byte[] sp) {
7592     
7593     
7594     this.splitRequest = true;
7595     if (sp != null) {
7596       this.explicitSplitPoint = sp;
7597     }
7598   }
7599 
7600   void clearSplit() {
7601     this.splitRequest = false;
7602     this.explicitSplitPoint = null;
7603   }
7604 
7605   
7606 
7607 
7608   protected void prepareToSplit() {
7609     
7610   }
7611 
7612   
7613 
7614 
7615 
7616 
7617 
7618   public byte[] checkSplit() {
7619     
7620     if (this.getRegionInfo().isMetaTable() ||
7621         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
7622       if (shouldForceSplit()) {
7623         LOG.warn("Cannot split meta region in HBase 0.20 and above");
7624       }
7625       return null;
7626     }
7627 
7628     
7629     if (this.isRecovering()) {
7630       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
7631       return null;
7632     }
7633 
7634     if (!splitPolicy.shouldSplit()) {
7635       return null;
7636     }
7637 
7638     byte[] ret = splitPolicy.getSplitPoint();
7639 
7640     if (ret != null) {
7641       try {
7642         checkRow(ret, "calculated split");
7643       } catch (IOException e) {
7644         LOG.error("Ignoring invalid split", e);
7645         return null;
7646       }
7647     }
7648     return ret;
7649   }
7650 
7651   
7652 
7653 
7654   public int getCompactPriority() {
7655     int count = Integer.MAX_VALUE;
7656     for (Store store : stores.values()) {
7657       count = Math.min(count, store.getCompactPriority());
7658     }
7659     return count;
7660   }
7661 
7662 
7663   
7664   @Override
7665   public RegionCoprocessorHost getCoprocessorHost() {
7666     return coprocessorHost;
7667   }
7668 
7669   
7670   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
7671     this.coprocessorHost = coprocessorHost;
7672   }
7673 
7674   @Override
7675   public void startRegionOperation() throws IOException {
7676     startRegionOperation(Operation.ANY);
7677   }
7678 
7679   @Override
7680   public void startRegionOperation(Operation op) throws IOException {
7681     switch (op) {
7682     case GET:  
7683     case SCAN:
7684       checkReadsEnabled();
7685     case INCREMENT: 
7686     case APPEND:
7687     case SPLIT_REGION:
7688     case MERGE_REGION:
7689     case PUT:
7690     case DELETE:
7691     case BATCH_MUTATE:
7692     case COMPACT_REGION:
7693       
7694       if (isRecovering() && (this.disallowWritesInRecovering ||
7695               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
7696         throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
7697           " is recovering; cannot take reads");
7698       }
7699       break;
7700     default:
7701       break;
7702     }
7703     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
7704         || op == Operation.COMPACT_REGION) {
7705       
7706       
7707       return;
7708     }
7709     if (this.closing.get()) {
7710       throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
7711     }
7712     lock(lock.readLock());
7713     if (this.closed.get()) {
7714       lock.readLock().unlock();
7715       throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
7716     }
7717     try {
7718       if (coprocessorHost != null) {
7719         coprocessorHost.postStartRegionOperation(op);
7720       }
7721     } catch (Exception e) {
7722       lock.readLock().unlock();
7723       throw new IOException(e);
7724     }
7725   }
7726 
7727   @Override
7728   public void closeRegionOperation() throws IOException {
7729     closeRegionOperation(Operation.ANY);
7730   }
7731 
7732   
7733 
7734 
7735 
7736 
7737   public void closeRegionOperation(Operation operation) throws IOException {
7738     lock.readLock().unlock();
7739     if (coprocessorHost != null) {
7740       coprocessorHost.postCloseRegionOperation(operation);
7741     }
7742   }
7743 
7744   
7745 
7746 
7747 
7748 
7749 
7750 
7751 
7752 
7753   private void startBulkRegionOperation(boolean writeLockNeeded)
7754       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
7755     if (this.closing.get()) {
7756       throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
7757     }
7758     if (writeLockNeeded) lock(lock.writeLock());
7759     else lock(lock.readLock());
7760     if (this.closed.get()) {
7761       if (writeLockNeeded) lock.writeLock().unlock();
7762       else lock.readLock().unlock();
7763       throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
7764     }
7765   }
7766 
7767   
7768 
7769 
7770 
7771   private void closeBulkRegionOperation(){
7772     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
7773     else lock.readLock().unlock();
7774   }
7775 
7776   
7777 
7778 
7779 
7780   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
7781     numMutationsWithoutWAL.increment();
7782     if (numMutationsWithoutWAL.get() <= 1) {
7783       LOG.info("writing data to region " + this +
7784                " with WAL disabled. Data may be lost in the event of a crash.");
7785     }
7786 
7787     long mutationSize = 0;
7788     for (List<Cell> cells: familyMap.values()) {
7789       assert cells instanceof RandomAccess;
7790       int listSize = cells.size();
7791       for (int i=0; i < listSize; i++) {
7792         Cell cell = cells.get(i);
7793         
7794         mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
7795       }
7796     }
7797 
7798     dataInMemoryWithoutWAL.add(mutationSize);
7799   }
7800 
7801   private void lock(final Lock lock)
7802       throws RegionTooBusyException, InterruptedIOException {
7803     lock(lock, 1);
7804   }
7805 
7806   
7807 
7808 
7809 
7810 
7811   private void lock(final Lock lock, final int multiplier)
7812       throws RegionTooBusyException, InterruptedIOException {
7813     try {
7814       final long waitTime = Math.min(maxBusyWaitDuration,
7815           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
7816       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
7817         throw new RegionTooBusyException(
7818             "failed to get a lock in " + waitTime + " ms. " +
7819                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
7820                 this.getRegionInfo().getRegionNameAsString()) +
7821                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
7822                 this.getRegionServerServices().getServerName()));
7823       }
7824     } catch (InterruptedException ie) {
7825       LOG.info("Interrupted while waiting for a lock");
7826       InterruptedIOException iie = new InterruptedIOException();
7827       iie.initCause(ie);
7828       throw iie;
7829     }
7830   }
7831 
7832   
7833 
7834 
7835 
7836 
7837 
7838   private void syncOrDefer(long txid, Durability durability) throws IOException {
7839     if (this.getRegionInfo().isMetaRegion()) {
7840       this.wal.sync(txid);
7841     } else {
7842       switch(durability) {
7843       case USE_DEFAULT:
7844         
7845         if (shouldSyncWAL()) {
7846           this.wal.sync(txid);
7847         }
7848         break;
7849       case SKIP_WAL:
7850         
7851         break;
7852       case ASYNC_WAL:
7853         
7854         break;
7855       case SYNC_WAL:
7856       case FSYNC_WAL:
7857         
7858         this.wal.sync(txid);
7859         break;
7860       }
7861     }
7862   }
7863 
7864   
7865 
7866 
7867   private boolean shouldSyncWAL() {
7868     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
7869   }
7870 
7871   
7872 
7873 
7874   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
7875 
7876     @Override
7877     public void add(int index, Cell element) {
7878       
7879     }
7880 
7881     @Override
7882     public boolean addAll(int index, Collection<? extends Cell> c) {
7883       return false; 
7884     }
7885 
7886     @Override
7887     public KeyValue get(int index) {
7888       throw new UnsupportedOperationException();
7889     }
7890 
7891     @Override
7892     public int size() {
7893       return 0;
7894     }
7895   };
7896 
7897   
7898 
7899 
7900 
7901 
7902 
7903 
7904 
7905 
7906   public static void main(String[] args) throws IOException {
7907     if (args.length < 1) {
7908       printUsageAndExit(null);
7909     }
7910     boolean majorCompact = false;
7911     if (args.length > 1) {
7912       if (!args[1].toLowerCase().startsWith("major")) {
7913         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
7914       }
7915       majorCompact = true;
7916     }
7917     final Path tableDir = new Path(args[0]);
7918     final Configuration c = HBaseConfiguration.create();
7919     final FileSystem fs = FileSystem.get(c);
7920     final Path logdir = new Path(c.get("hbase.tmp.dir"));
7921     final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
7922 
7923     final Configuration walConf = new Configuration(c);
7924     FSUtils.setRootDir(walConf, logdir);
7925     final WALFactory wals = new WALFactory(walConf, null, logname);
7926     try {
7927       processTable(fs, tableDir, wals, c, majorCompact);
7928     } finally {
7929        wals.close();
7930        
7931        BlockCache bc = new CacheConfig(c).getBlockCache();
7932        if (bc != null) bc.shutdown();
7933     }
7934   }
7935 
7936   @Override
7937   public long getOpenSeqNum() {
7938     return this.openSeqNum;
7939   }
7940 
7941   @Override
7942   public Map<byte[], Long> getMaxStoreSeqId() {
7943     return this.maxSeqIdInStores;
7944   }
7945 
7946   @Override
7947   public long getOldestSeqIdOfStore(byte[] familyName) {
7948     return wal.getEarliestMemstoreSeqNum(getRegionInfo()
7949         .getEncodedNameAsBytes(), familyName);
7950   }
7951 
7952   @Override
7953   public CompactionState getCompactionState() {
7954     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
7955     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
7956         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
7957   }
7958 
7959   public void reportCompactionRequestStart(boolean isMajor){
7960     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
7961   }
7962 
7963   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
7964     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
7965 
7966     
7967     compactionsFinished.incrementAndGet();
7968     compactionNumFilesCompacted.addAndGet(numFiles);
7969     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
7970 
7971     assert newValue >= 0;
7972   }
7973 
7974   
7975 
7976 
7977 
7978   @VisibleForTesting
7979   public AtomicLong getSequenceId() {
7980     return this.sequenceId;
7981   }
7982 
7983   
7984 
7985 
7986 
7987   private void setSequenceId(long value) {
7988     this.sequenceId.set(value);
7989   }
7990 
7991   @VisibleForTesting class RowLockContext {
7992     private final HashedBytes row;
7993     private final CountDownLatch latch = new CountDownLatch(1);
7994     private final Thread thread;
7995     private int lockCount = 0;
7996 
7997     RowLockContext(HashedBytes row) {
7998       this.row = row;
7999       this.thread = Thread.currentThread();
8000     }
8001 
8002     boolean ownedByCurrentThread() {
8003       return thread == Thread.currentThread();
8004     }
8005 
8006     RowLock newLock() {
8007       lockCount++;
8008       RowLockImpl rl = new RowLockImpl();
8009       rl.setContext(this);
8010       return rl;
8011     }
8012 
8013     void releaseLock() {
8014       if (!ownedByCurrentThread()) {
8015         throw new IllegalArgumentException("Lock held by thread: " + thread
8016           + " cannot be released by different thread: " + Thread.currentThread());
8017       }
8018       lockCount--;
8019       if (lockCount == 0) {
8020         
8021         RowLockContext existingContext = lockedRows.remove(row);
8022         if (existingContext != this) {
8023           throw new RuntimeException(
8024               "Internal row lock state inconsistent, should not happen, row: " + row);
8025         }
8026         latch.countDown();
8027       }
8028     }
8029 
8030     @Override
8031     public String toString() {
8032       return "RowLockContext{" +
8033         "row=" + row +
8034         ", count=" + lockCount +
8035         ", threadName=" + thread.getName() +
8036         '}';
8037     }
8038 
8039   }
8040 
8041   public static class RowLockImpl implements RowLock {
8042     private RowLockContext context;
8043     private boolean released = false;
8044 
8045     @VisibleForTesting
8046     public RowLockContext getContext() {
8047       return context;
8048     }
8049 
8050     @VisibleForTesting
8051     public void setContext(RowLockContext context) {
8052       this.context = context;
8053     }
8054 
8055     @Override
8056     public void release() {
8057       if (!released) {
8058         context.releaseLock();
8059       }
8060       released = true;
8061     }
8062   }
8063 
8064   
8065 
8066 
8067 
8068 
8069 
8070 
8071 
8072 
8073   private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
8074     
8075     @SuppressWarnings("deprecation")
8076     WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
8077       WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
8078     
8079     
8080     wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, getSequenceId(), false,
8081       cells);
8082     return key;
8083   }
8084 
8085   
8086 
8087 
8088   @Override
8089   public void onConfigurationChange(Configuration conf) {
8090     
8091   }
8092 
8093   
8094 
8095 
8096   @Override
8097   public void registerChildren(ConfigurationManager manager) {
8098     configurationManager = Optional.of(manager);
8099     for (Store s : this.stores.values()) {
8100       configurationManager.get().registerObserver(s);
8101     }
8102   }
8103 
8104   
8105 
8106 
8107   @Override
8108   public void deregisterChildren(ConfigurationManager manager) {
8109     for (Store s : this.stores.values()) {
8110       configurationManager.get().deregisterObserver(s);
8111     }
8112   }
8113 
8114   
8115 
8116 
8117   public RegionSplitPolicy getSplitPolicy() {
8118     return this.splitPolicy;
8119   }
8120 
8121 
8122 }