View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.ListIterator;
27  import java.util.Map;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.Server;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.MetaTableAccessor;
45  import org.apache.hadoop.hbase.client.HConnection;
46  import org.apache.hadoop.hbase.client.Mutation;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
49  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
50  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
51  import org.apache.hadoop.hbase.security.User;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.CancelableProgressable;
54  import org.apache.hadoop.hbase.util.ConfigUtil;
55  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56  import org.apache.hadoop.hbase.util.FSUtils;
57  import org.apache.hadoop.hbase.util.HasThread;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.util.PairOfSameType;
60  import org.apache.zookeeper.KeeperException;
61  
62  import com.google.common.util.concurrent.ThreadFactoryBuilder;
63  
64  @InterfaceAudience.Private
65  public class SplitTransactionImpl implements SplitTransaction {
66    private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
67  
68    /*
69     * Region to split
70     */
71    private final HRegion parent;
72    private HRegionInfo hri_a;
73    private HRegionInfo hri_b;
74    private long fileSplitTimeout = 30000;
75    public SplitTransactionCoordination.SplitTransactionDetails std;
76    boolean useZKForAssignment;
77  
78    /*
79     * Row to split around
80     */
81    private final byte [] splitrow;
82  
83    /*
84     * Transaction state for listener, only valid during execute and
85     * rollback
86     */
87    private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
88    private Server server;
89    private RegionServerServices rsServices;
90  
91    public static class JournalEntryImpl implements JournalEntry {
92      private SplitTransactionPhase type;
93      private long timestamp;
94  
95      public JournalEntryImpl(SplitTransactionPhase type) {
96        this(type, EnvironmentEdgeManager.currentTime());
97      }
98  
99      public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
100       this.type = type;
101       this.timestamp = timestamp;
102     }
103 
104     @Override
105     public String toString() {
106       StringBuilder sb = new StringBuilder();
107       sb.append(type);
108       sb.append(" at ");
109       sb.append(timestamp);
110       return sb.toString();
111     }
112 
113     @Override
114     public SplitTransactionPhase getPhase() {
115       return type;
116     }
117 
118     @Override
119     public long getTimeStamp() {
120       return timestamp;
121     }
122   }
123 
124   /*
125    * Journal of how far the split transaction has progressed.
126    */
127   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
128 
129   /**
130    * Listeners
131    */
132   private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
133 
134   /**
135    * Constructor
136    * @param r Region to split
137    * @param splitrow Row to split around
138    */
139   public SplitTransactionImpl(final Region r, final byte [] splitrow) {
140     this.parent = (HRegion)r;
141     this.splitrow = splitrow;
142     this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
143     useZKForAssignment = ConfigUtil.useZKForAssignment(parent.getBaseConf());
144   }
145 
146   private void transition(SplitTransactionPhase nextPhase) throws IOException {
147     transition(nextPhase, false);
148   }
149 
150   private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
151       throws IOException {
152     if (!isRollback) {
153       // Add to the journal first, because if the listener throws an exception
154       // we need to roll back starting at 'nextPhase'
155       this.journal.add(new JournalEntryImpl(nextPhase));
156     }
157     for (int i = 0; i < listeners.size(); i++) {
158       TransactionListener listener = listeners.get(i);
159       if (!isRollback) {
160         listener.transition(this, currentPhase, nextPhase);
161       } else {
162         listener.rollback(this, currentPhase, nextPhase);
163       }
164     }
165     currentPhase = nextPhase;
166   }
167 
168   /**
169    * Does checks on split inputs.
170    * @return <code>true</code> if the region is splittable else
171    * <code>false</code> if it is not (e.g. its already closed, etc.).
172    */
173   public boolean prepare() throws IOException {
174     if (!this.parent.isSplittable()) return false;
175     // Split key can be null if this region is unsplittable; i.e. has refs.
176     if (this.splitrow == null) return false;
177     HRegionInfo hri = this.parent.getRegionInfo();
178     parent.prepareToSplit();
179     // Check splitrow.
180     byte [] startKey = hri.getStartKey();
181     byte [] endKey = hri.getEndKey();
182     if (Bytes.equals(startKey, splitrow) ||
183         !this.parent.getRegionInfo().containsRow(splitrow)) {
184       LOG.info("Split row is not inside region key range or is equal to " +
185           "startkey: " + Bytes.toStringBinary(this.splitrow));
186       return false;
187     }
188     long rid = getDaughterRegionIdTimestamp(hri);
189     this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
190     this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
191 
192     transition(SplitTransactionPhase.PREPARED);
193 
194     return true;
195   }
196 
197   /**
198    * Calculate daughter regionid to use.
199    * @param hri Parent {@link HRegionInfo}
200    * @return Daughter region id (timestamp) to use.
201    */
202   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
203     long rid = EnvironmentEdgeManager.currentTime();
204     // Regionid is timestamp.  Can't be less than that of parent else will insert
205     // at wrong location in hbase:meta (See HBASE-710).
206     if (rid < hri.getRegionId()) {
207       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
208         " but current time here is " + rid);
209       rid = hri.getRegionId() + 1;
210     }
211     return rid;
212   }
213 
214   private static IOException closedByOtherException = new IOException(
215       "Failed to close region: already closed by another thread");
216 
217   /* package */PairOfSameType<Region> createDaughters(final Server server,
218       final RegionServerServices services) throws IOException {
219     return createDaughters(server, services, null);
220   }
221 
222   /**
223    * Prepare the regions and region files.
224    * @param server Hosting server instance.  Can be null when testing (won't try
225    * and update in zk if a null server)
226    * @param services Used to online/offline regions.
227    * @param user
228    * @throws IOException If thrown, transaction failed.
229    *    Call {@link #rollback(Server, RegionServerServices)}
230    * @return Regions created
231    */
232   /* package */PairOfSameType<Region> createDaughters(final Server server,
233       final RegionServerServices services, User user) throws IOException {
234     LOG.info("Starting split of region " + this.parent);
235     if ((server != null && server.isStopped()) ||
236         (services != null && services.isStopping())) {
237       throw new IOException("Server is stopped or stopping");
238     }
239     assert !this.parent.lock.writeLock().isHeldByCurrentThread():
240       "Unsafe to hold write lock while performing RPCs";
241 
242     transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
243 
244     // Coprocessor callback
245     if (this.parent.getCoprocessorHost() != null) {
246       if (user == null) {
247         // TODO: Remove one of these
248         parent.getCoprocessorHost().preSplit();
249         parent.getCoprocessorHost().preSplit(splitrow);
250       } else {
251         try {
252           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
253             @Override
254             public Void run() throws Exception {
255               parent.getCoprocessorHost().preSplit();
256               parent.getCoprocessorHost().preSplit(splitrow);
257               return null;
258             }
259           });
260         } catch (InterruptedException ie) {
261           InterruptedIOException iioe = new InterruptedIOException();
262           iioe.initCause(ie);
263           throw iioe;
264         }
265       }
266     }
267 
268     transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
269 
270     // If true, no cluster to write meta edits to or to update znodes in.
271     boolean testing = server == null? true:
272         server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
273     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
274         server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
275           this.fileSplitTimeout);
276 
277     PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
278 
279     final List<Mutation> metaEntries = new ArrayList<Mutation>();
280     boolean ret = false;
281     if (this.parent.getCoprocessorHost() != null) {
282       if (user == null) {
283         ret = parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
284       } else {
285         try {
286           ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
287             @Override
288             public Boolean run() throws Exception {
289               return parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
290             }
291           });
292         } catch (InterruptedException ie) {
293           InterruptedIOException iioe = new InterruptedIOException();
294           iioe.initCause(ie);
295           throw iioe;
296         }
297       }
298       if (ret) {
299           throw new IOException("Coprocessor bypassing region "
300             + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
301       }
302       try {
303         for (Mutation p : metaEntries) {
304           HRegionInfo.parseRegionName(p.getRow());
305         }
306       } catch (IOException e) {
307         LOG.error("Row key of mutation from coprossor is not parsable as region name."
308             + "Mutations from coprocessor should only for hbase:meta table.");
309         throw e;
310       }
311     }
312 
313     // This is the point of no return.  Adding subsequent edits to .META. as we
314     // do below when we do the daughter opens adding each to .META. can fail in
315     // various interesting ways the most interesting of which is a timeout
316     // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
317     // then subsequent failures need to crash out this regionserver; the
318     // server shutdown processing should be able to fix-up the incomplete split.
319     // The offlined parent will have the daughters as extra columns.  If
320     // we leave the daughter regions in place and do not remove them when we
321     // crash out, then they will have their references to the parent in place
322     // still and the server shutdown fixup of .META. will point to these
323     // regions.
324     // We should add PONR JournalEntry before offlineParentInMeta,so even if
325     // OfflineParentInMeta timeout,this will cause regionserver exit,and then
326     // master ServerShutdownHandler will fix daughter & avoid data loss. (See
327     // HBase-4562).
328     transition(SplitTransactionPhase.PONR);
329 
330     // Edit parent in meta.  Offlines parent region and adds splita and splitb
331     // as an atomic update. See HBASE-7721. This update to META makes the region
332     // will determine whether the region is split or not in case of failures.
333     // If it is successful, master will roll-forward, if not, master will rollback
334     // and assign the parent region.
335     if (!testing && useZKForAssignment) {
336       if (metaEntries == null || metaEntries.isEmpty()) {
337         MetaTableAccessor.splitRegion(server.getConnection(),
338           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
339           daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
340           parent.getTableDesc().getRegionReplication());
341       } else {
342         offlineParentInMetaAndputMetaEntries(server.getConnection(),
343           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
344               .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
345               parent.getTableDesc().getRegionReplication());
346       }
347     } else if (services != null && !useZKForAssignment) {
348       if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
349           parent.getRegionInfo(), hri_a, hri_b)) {
350         // Passed PONR, let SSH clean it up
351         throw new IOException("Failed to notify master that split passed PONR: "
352           + parent.getRegionInfo().getRegionNameAsString());
353       }
354     }
355     return daughterRegions;
356   }
357 
358   public PairOfSameType<Region> stepsBeforePONR(final Server server,
359       final RegionServerServices services, boolean testing) throws IOException {
360 
361     if (useCoordinatedStateManager(server)) {
362       if (std == null) {
363         std =
364             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
365                 .getSplitTransactionCoordination().getDefaultDetails();
366       }
367       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
368           .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
369             hri_a, hri_b);
370     } else if (services != null && !useZKForAssignment) {
371       if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
372           parent.getRegionInfo(), hri_a, hri_b)) {
373         throw new IOException("Failed to get ok from master to split "
374           + parent.getRegionInfo().getRegionNameAsString());
375       }
376     }
377 
378     transition(SplitTransactionPhase.SET_SPLITTING);
379 
380     if (useCoordinatedStateManager(server)) {
381       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
382           .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
383             hri_b, std);
384     }
385 
386     this.parent.getRegionFileSystem().createSplitsDir();
387 
388     transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
389 
390     Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
391     Exception exceptionToThrow = null;
392     try{
393       hstoreFilesToSplit = this.parent.close(false);
394     } catch (Exception e) {
395       exceptionToThrow = e;
396     }
397     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
398       // The region was closed by a concurrent thread.  We can't continue
399       // with the split, instead we must just abandon the split.  If we
400       // reopen or split this could cause problems because the region has
401       // probably already been moved to a different server, or is in the
402       // process of moving to a different server.
403       exceptionToThrow = closedByOtherException;
404     }
405     if (exceptionToThrow != closedByOtherException) {
406       transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
407     }
408     if (exceptionToThrow != null) {
409       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
410       throw new IOException(exceptionToThrow);
411     }
412     if (!testing) {
413       services.removeFromOnlineRegions(this.parent, null);
414     }
415 
416     transition(SplitTransactionPhase.OFFLINED_PARENT);
417 
418     // TODO: If splitStoreFiles were multithreaded would we complete steps in
419     // less elapsed time?  St.Ack 20100920
420     //
421     // splitStoreFiles creates daughter region dirs under the parent splits dir
422     // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
423     // clean this up.
424     Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
425 
426     // Log to the journal that we are creating region A, the first daughter
427     // region.  We could fail halfway through.  If we do, we could have left
428     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
429     // add entry to journal BEFORE rather than AFTER the change.
430     transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
431 
432     assertReferenceFileCount(expectedReferences.getFirst(),
433         this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
434     Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
435     assertReferenceFileCount(expectedReferences.getFirst(),
436         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
437 
438     // Ditto
439     transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
440 
441     assertReferenceFileCount(expectedReferences.getSecond(),
442         this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
443     Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
444     assertReferenceFileCount(expectedReferences.getSecond(),
445         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
446 
447     return new PairOfSameType<Region>(a, b);
448   }
449 
450   void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
451       throws IOException {
452     if (expectedReferenceFileCount != 0 &&
453         expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
454           dir)) {
455       throw new IOException("Failing split. Expected reference file count isn't equal.");
456     }
457   }
458 
459   /**
460    * Perform time consuming opening of the daughter regions.
461    * @param server Hosting server instance.  Can be null when testing
462    * @param services Used to online/offline regions.
463    * @param a first daughter region
464    * @param a second daughter region
465    * @throws IOException If thrown, transaction failed.
466    *          Call {@link #rollback(Server, RegionServerServices)}
467    */
468   /* package */void openDaughters(final Server server,
469       final RegionServerServices services, Region a, Region b)
470       throws IOException {
471     boolean stopped = server != null && server.isStopped();
472     boolean stopping = services != null && services.isStopping();
473     // TODO: Is this check needed here?
474     if (stopped || stopping) {
475       LOG.info("Not opening daughters " +
476           b.getRegionInfo().getRegionNameAsString() +
477           " and " +
478           a.getRegionInfo().getRegionNameAsString() +
479           " because stopping=" + stopping + ", stopped=" + stopped);
480     } else {
481       // Open daughters in parallel.
482       DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
483       DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
484       aOpener.start();
485       bOpener.start();
486       try {
487         aOpener.join();
488         if (aOpener.getException() == null) {
489           transition(SplitTransactionPhase.OPENED_REGION_A);
490         }
491         bOpener.join();
492         if (bOpener.getException() == null) {
493           transition(SplitTransactionPhase.OPENED_REGION_B);
494         }
495       } catch (InterruptedException e) {
496         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
497       }
498       if (aOpener.getException() != null) {
499         throw new IOException("Failed " +
500           aOpener.getName(), aOpener.getException());
501       }
502       if (bOpener.getException() != null) {
503         throw new IOException("Failed " +
504           bOpener.getName(), bOpener.getException());
505       }
506       if (services != null) {
507         try {
508           if (useZKForAssignment) {
509             // add 2nd daughter first (see HBASE-4335)
510             services.postOpenDeployTasks(b);
511           } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
512               parent.getRegionInfo(), hri_a, hri_b)) {
513             throw new IOException("Failed to report split region to master: "
514               + parent.getRegionInfo().getShortNameToLog());
515           }
516           // Should add it to OnlineRegions
517           services.addToOnlineRegions(b);
518           if (useZKForAssignment) {
519             services.postOpenDeployTasks(a);
520           }
521           services.addToOnlineRegions(a);
522         } catch (KeeperException ke) {
523           throw new IOException(ke);
524         }
525       }
526     }
527   }
528 
529   public PairOfSameType<Region> execute(final Server server,
530     final RegionServerServices services)
531         throws IOException {
532     if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
533       LOG.warn("Should use execute(Server, RegionServerServices, User)");
534     }
535     return execute(server, services, null);
536   }
537 
538   /**
539    * Run the transaction.
540    * @param server Hosting server instance.  Can be null when testing
541    * @param services Used to online/offline regions.
542    * @throws IOException If thrown, transaction failed.
543    *          Call {@link #rollback(Server, RegionServerServices)}
544    * @return Regions created
545    * @throws IOException
546    * @see #rollback(Server, RegionServerServices)
547    */
548   @Override
549   public PairOfSameType<Region> execute(final Server server,
550       final RegionServerServices services, User user) throws IOException {
551     this.server = server;
552     this.rsServices = services;
553     useZKForAssignment = server == null ? true :
554       ConfigUtil.useZKForAssignment(server.getConfiguration());
555     if (useCoordinatedStateManager(server)) {
556       std =
557           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
558               .getSplitTransactionCoordination().getDefaultDetails();
559     }
560     PairOfSameType<Region> regions = createDaughters(server, services, user);
561     if (this.parent.getCoprocessorHost() != null) {
562       if (user == null) {
563         parent.getCoprocessorHost().preSplitAfterPONR();
564       } else {
565         try {
566           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
567             @Override
568             public Void run() throws Exception {
569               parent.getCoprocessorHost().preSplitAfterPONR();
570               return null;
571             }
572           });
573         } catch (InterruptedException ie) {
574           InterruptedIOException iioe = new InterruptedIOException();
575           iioe.initCause(ie);
576           throw iioe;
577         }
578       }
579     }
580     regions = stepsAfterPONR(server, services, regions, user);
581 
582     transition(SplitTransactionPhase.COMPLETED);
583 
584     return regions;
585   }
586 
587   @Deprecated
588   public PairOfSameType<Region> stepsAfterPONR(final Server server,
589       final RegionServerServices services, final PairOfSameType<Region> regions)
590       throws IOException {
591     return stepsAfterPONR(server, services, regions, null);
592   }
593 
594   public PairOfSameType<Region> stepsAfterPONR(final Server server,
595       final RegionServerServices services, final PairOfSameType<Region> regions, User user)
596       throws IOException {
597     openDaughters(server, services, regions.getFirst(), regions.getSecond());
598     if (useCoordinatedStateManager(server)) {
599       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
600           .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
601             regions.getSecond(), std, parent);
602     }
603 
604     transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
605 
606     // Coprocessor callback
607     if (parent.getCoprocessorHost() != null) {
608       if (user == null) {
609         this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
610       } else {
611         try {
612           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
613             @Override
614             public Void run() throws Exception {
615               parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
616               return null;
617             }
618           });
619         } catch (InterruptedException ie) {
620           InterruptedIOException iioe = new InterruptedIOException();
621           iioe.initCause(ie);
622           throw iioe;
623         }
624       }
625     }
626 
627     transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
628 
629     return regions;
630   }
631 
632   private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
633       HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
634       ServerName serverName, List<Mutation> metaEntries, int regionReplication)
635           throws IOException {
636     List<Mutation> mutations = metaEntries;
637     HRegionInfo copyOfParent = new HRegionInfo(parent);
638     copyOfParent.setOffline(true);
639     copyOfParent.setSplit(true);
640 
641     //Put for parent
642     Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
643     MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
644     mutations.add(putParent);
645     
646     //Puts for daughters
647     Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
648     Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
649 
650     addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
651     addLocation(putB, serverName, 1);
652     mutations.add(putA);
653     mutations.add(putB);
654 
655     // Add empty locations for region replicas of daughters so that number of replicas can be
656     // cached whenever the primary region is looked up from meta
657     for (int i = 1; i < regionReplication; i++) {
658       addEmptyLocation(putA, i);
659       addEmptyLocation(putB, i);
660     }
661 
662     MetaTableAccessor.mutateMetaTable(hConnection, mutations);
663   }
664 
665   private static Put addEmptyLocation(final Put p, int replicaId){
666     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
667     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId),
668       null);
669     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
670     return p;
671   }
672 
673   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
674     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
675       Bytes.toBytes(sn.getHostAndPort()));
676     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
677       Bytes.toBytes(sn.getStartcode()));
678     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
679         Bytes.toBytes(openSeqNum));
680     return p;
681   }
682 
683   /*
684    * Open daughter region in its own thread.
685    * If we fail, abort this hosting server.
686    */
687   class DaughterOpener extends HasThread {
688     private final Server server;
689     private final HRegion r;
690     private Throwable t = null;
691 
692     DaughterOpener(final Server s, final HRegion r) {
693       super((s == null? "null-services": s.getServerName()) +
694         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
695       setDaemon(true);
696       this.server = s;
697       this.r = r;
698     }
699 
700     /**
701      * @return Null if open succeeded else exception that causes us fail open.
702      * Call it after this thread exits else you may get wrong view on result.
703      */
704     Throwable getException() {
705       return this.t;
706     }
707 
708     @Override
709     public void run() {
710       try {
711         openDaughterRegion(this.server, r);
712       } catch (Throwable t) {
713         this.t = t;
714       }
715     }
716   }
717 
718   /**
719    * Open daughter regions, add them to online list and update meta.
720    * @param server
721    * @param daughter
722    * @throws IOException
723    * @throws KeeperException
724    */
725   void openDaughterRegion(final Server server, final HRegion daughter)
726   throws IOException, KeeperException {
727     HRegionInfo hri = daughter.getRegionInfo();
728     LoggingProgressable reporter = server == null ? null
729         : new LoggingProgressable(hri, server.getConfiguration().getLong(
730             "hbase.regionserver.split.daughter.open.log.interval", 10000));
731     daughter.openHRegion(reporter);
732   }
733 
734   static class LoggingProgressable implements CancelableProgressable {
735     private final HRegionInfo hri;
736     private long lastLog = -1;
737     private final long interval;
738 
739     LoggingProgressable(final HRegionInfo hri, final long interval) {
740       this.hri = hri;
741       this.interval = interval;
742     }
743 
744     @Override
745     public boolean progress() {
746       long now = EnvironmentEdgeManager.currentTime();
747       if (now - lastLog > this.interval) {
748         LOG.info("Opening " + this.hri.getRegionNameAsString());
749         this.lastLog = now;
750       }
751       return true;
752     }
753   }
754 
755   private boolean useCoordinatedStateManager(final Server server) {
756     return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
757   }
758 
759   /**
760    * Creates reference files for top and bottom half of the
761    * @param hstoreFilesToSplit map of store files to create half file references for.
762    * @return the number of reference files that were created.
763    * @throws IOException
764    */
765   private Pair<Integer, Integer> splitStoreFiles(
766       final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
767       throws IOException {
768     if (hstoreFilesToSplit == null) {
769       // Could be null because close didn't succeed -- for now consider it fatal
770       throw new IOException("Close returned empty list of StoreFiles");
771     }
772     // The following code sets up a thread pool executor with as many slots as
773     // there's files to split. It then fires up everything, waits for
774     // completion and finally checks for any exception
775     int nbFiles = 0;
776     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
777         nbFiles += entry.getValue().size();
778     }
779     if (nbFiles == 0) {
780       // no file needs to be splitted.
781       return new Pair<Integer, Integer>(0,0);
782     }
783     // Default max #threads to use is the smaller of table's configured number of blocking store
784     // files or the available number of logical cores.
785     int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
786                 HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
787             Runtime.getRuntime().availableProcessors());
788     // Max #threads is the smaller of the number of storefiles or the default max determined above.
789     int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
790                 defMaxThreads), nbFiles);
791     LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
792             " using " + maxThreads + " threads");
793     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
794     builder.setNameFormat("StoreFileSplitter-%1$d");
795     ThreadFactory factory = builder.build();
796     ThreadPoolExecutor threadPool =
797       (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
798     List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
799 
800     // Split each store file.
801     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
802       for (StoreFile sf: entry.getValue()) {
803         StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
804         futures.add(threadPool.submit(sfs));
805       }
806     }
807     // Shutdown the pool
808     threadPool.shutdown();
809 
810     // Wait for all the tasks to finish
811     try {
812       boolean stillRunning = !threadPool.awaitTermination(
813           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
814       if (stillRunning) {
815         threadPool.shutdownNow();
816         // wait for the thread to shutdown completely.
817         while (!threadPool.isTerminated()) {
818           Thread.sleep(50);
819         }
820         throw new IOException("Took too long to split the" +
821             " files and create the references, aborting split");
822       }
823     } catch (InterruptedException e) {
824       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
825     }
826 
827     int created_a = 0;
828     int created_b = 0;
829     // Look for any exception
830     for (Future<Pair<Path, Path>> future : futures) {
831       try {
832         Pair<Path, Path> p = future.get();
833         created_a += p.getFirst() != null ? 1 : 0;
834         created_b += p.getSecond() != null ? 1 : 0;
835       } catch (InterruptedException e) {
836         throw (InterruptedIOException) new InterruptedIOException().initCause(e);
837       } catch (ExecutionException e) {
838         throw new IOException(e);
839       }
840     }
841 
842     if (LOG.isDebugEnabled()) {
843       LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a
844           + " storefiles, Daughter B: " + created_b + " storefiles.");
845     }
846     return new Pair<Integer, Integer>(created_a, created_b);
847   }
848 
849   private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
850       throws IOException {
851     if (LOG.isDebugEnabled()) {
852         LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
853                   this.parent);
854     }
855     HRegionFileSystem fs = this.parent.getRegionFileSystem();
856     String familyName = Bytes.toString(family);
857 
858     Path path_a =
859         fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
860           this.parent.getSplitPolicy());
861     Path path_b =
862         fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
863           this.parent.getSplitPolicy());
864     if (LOG.isDebugEnabled()) {
865         LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
866                   this.parent);
867     }
868     return new Pair<Path,Path>(path_a, path_b);
869   }
870 
871   /**
872    * Utility class used to do the file splitting / reference writing
873    * in parallel instead of sequentially.
874    */
875   class StoreFileSplitter implements Callable<Pair<Path,Path>> {
876     private final byte[] family;
877     private final StoreFile sf;
878 
879     /**
880      * Constructor that takes what it needs to split
881      * @param family Family that contains the store file
882      * @param sf which file
883      */
884     public StoreFileSplitter(final byte[] family, final StoreFile sf) {
885       this.sf = sf;
886       this.family = family;
887     }
888 
889     public Pair<Path,Path> call() throws IOException {
890       return splitStoreFile(family, sf);
891     }
892   }
893   
894   @Override
895   public boolean rollback(final Server server, final RegionServerServices services)
896       throws IOException {
897     if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
898       LOG.warn("Should use rollback(Server, RegionServerServices, User)");
899     }
900     return rollback(server, services, null);
901   }
902 
903   /**
904    * @param server Hosting server instance (May be null when testing).
905    * @param services
906    * @throws IOException If thrown, rollback failed.  Take drastic action.
907    * @return True if we successfully rolled back, false if we got to the point
908    * of no return and so now need to abort the server to minimize damage.
909    */
910   @Override
911   @SuppressWarnings("deprecation")
912   public boolean rollback(final Server server, final RegionServerServices services, User user)
913   throws IOException {
914     // Coprocessor callback
915     if (this.parent.getCoprocessorHost() != null) {
916       if (user == null) {
917         this.parent.getCoprocessorHost().preRollBackSplit();
918       } else {
919         try {
920           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
921             @Override
922             public Void run() throws Exception {
923               parent.getCoprocessorHost().preRollBackSplit();
924               return null;
925             }
926           });
927         } catch (InterruptedException ie) {
928           InterruptedIOException iioe = new InterruptedIOException();
929           iioe.initCause(ie);
930           throw iioe;
931         }
932       }
933     }
934 
935     boolean result = true;
936     ListIterator<JournalEntry> iterator =
937       this.journal.listIterator(this.journal.size());
938     // Iterate in reverse.
939     while (iterator.hasPrevious()) {
940       JournalEntry je = iterator.previous();
941 
942       transition(je.getPhase(), true);
943 
944       switch(je.getPhase()) {
945 
946       case SET_SPLITTING:
947         if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
948           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
949               .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
950         } else if (services != null && !useZKForAssignment
951             && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
952                 parent.getRegionInfo(), hri_a, hri_b)) {
953           return false;
954         }
955         break;
956 
957       case CREATE_SPLIT_DIR:
958         this.parent.writestate.writesEnabled = true;
959         this.parent.getRegionFileSystem().cleanupSplitsDir();
960         break;
961 
962       case CLOSED_PARENT_REGION:
963         try {
964           // So, this returns a seqid but if we just closed and then reopened, we
965           // should be ok. On close, we flushed using sequenceid obtained from
966           // hosting regionserver so no need to propagate the sequenceid returned
967           // out of initialize below up into regionserver as we normally do.
968           // TODO: Verify.
969           this.parent.initialize();
970         } catch (IOException e) {
971           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
972             this.parent.getRegionInfo().getRegionNameAsString(), e);
973           throw new RuntimeException(e);
974         }
975         break;
976 
977       case STARTED_REGION_A_CREATION:
978         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
979         break;
980 
981       case STARTED_REGION_B_CREATION:
982         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
983         break;
984 
985       case OFFLINED_PARENT:
986         if (services != null) services.addToOnlineRegions(this.parent);
987         break;
988 
989       case PONR:
990         // We got to the point-of-no-return so we need to just abort. Return
991         // immediately.  Do not clean up created daughter regions.  They need
992         // to be in place so we don't delete the parent region mistakenly.
993         // See HBASE-3872.
994         return false;
995 
996       // Informational only cases
997       case STARTED:
998       case PREPARED:
999       case BEFORE_PRE_SPLIT_HOOK:
1000       case AFTER_PRE_SPLIT_HOOK:
1001       case BEFORE_POST_SPLIT_HOOK:
1002       case AFTER_POST_SPLIT_HOOK:
1003       case OPENED_REGION_A:
1004       case OPENED_REGION_B:
1005       case COMPLETED:
1006         break;
1007 
1008       default:
1009         throw new RuntimeException("Unhandled journal entry: " + je);
1010       }
1011     }
1012     // Coprocessor callback
1013     if (this.parent.getCoprocessorHost() != null) {
1014       if (user == null) {
1015         this.parent.getCoprocessorHost().postRollBackSplit();
1016       } else {
1017         try {
1018           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1019             @Override
1020             public Void run() throws Exception {
1021               parent.getCoprocessorHost().postRollBackSplit();
1022               return null;
1023             }
1024           });
1025         } catch (InterruptedException ie) {
1026           InterruptedIOException iioe = new InterruptedIOException();
1027           iioe.initCause(ie);
1028           throw iioe;
1029         }
1030       }
1031     }
1032     return result;
1033   }
1034 
1035   HRegionInfo getFirstDaughter() {
1036     return hri_a;
1037   }
1038 
1039   HRegionInfo getSecondDaughter() {
1040     return hri_b;
1041   }
1042 
1043   @Override
1044   public List<JournalEntry> getJournal() {
1045     return journal;
1046   }
1047 
1048   @Override
1049   public SplitTransaction registerTransactionListener(TransactionListener listener) {
1050     listeners.add(listener);
1051     return this;
1052   }
1053 
1054   @Override
1055   public Server getServer() {
1056     return server;
1057   }
1058 
1059   @Override
1060   public RegionServerServices getRegionServerServices() {
1061     return rsServices;
1062   }
1063 }