View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.ListIterator;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.Server;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.MetaTableAccessor;
43  import org.apache.hadoop.hbase.client.HConnection;
44  import org.apache.hadoop.hbase.client.Mutation;
45  import org.apache.hadoop.hbase.client.Put;
46  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
47  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
48  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.CancelableProgressable;
51  import org.apache.hadoop.hbase.util.ConfigUtil;
52  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53  import org.apache.hadoop.hbase.util.HasThread;
54  import org.apache.hadoop.hbase.util.PairOfSameType;
55  import org.apache.zookeeper.KeeperException;
56  
57  import com.google.common.util.concurrent.ThreadFactoryBuilder;
58  
59  /**
60   * Executes region split as a "transaction".  Call {@link #prepare()} to setup
61   * the transaction, {@link #execute(Server, RegionServerServices)} to run the
62   * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
63   *
64   * <p>Here is an example of how you would use this class:
65   * <pre>
66   *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
67   *  if (!st.prepare()) return;
68   *  try {
69   *    st.execute(server, services);
70   *  } catch (IOException ioe) {
71   *    try {
72   *      st.rollback(server, services);
73   *      return;
74   *    } catch (RuntimeException e) {
75   *      myAbortable.abort("Failed split, abort");
76   *    }
77   *  }
78   * </Pre>
79   * <p>This class is not thread safe.  Caller needs ensure split is run by
80   * one thread only.
81   */
82  @InterfaceAudience.Private
83  public class SplitTransaction {
84    private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
85  
86    /*
87     * Region to split
88     */
89    private final HRegion parent;
90    private HRegionInfo hri_a;
91    private HRegionInfo hri_b;
92    private long fileSplitTimeout = 30000;
93    public SplitTransactionCoordination.SplitTransactionDetails std;
94    boolean useZKForAssignment;
95  
96    /*
97     * Row to split around
98     */
99    private final byte [] splitrow;
100 
101   /**
102    * Types to add to the transaction journal.
103    * Each enum is a step in the split transaction. Used to figure how much
104    * we need to rollback.
105    */
106   enum JournalEntry {
107     /**
108      * Set region as in transition, set it into SPLITTING state.
109      */
110     SET_SPLITTING,
111     /**
112      * We created the temporary split data directory.
113      */
114     CREATE_SPLIT_DIR,
115     /**
116      * Closed the parent region.
117      */
118     CLOSED_PARENT_REGION,
119     /**
120      * The parent has been taken out of the server's online regions list.
121      */
122     OFFLINED_PARENT,
123     /**
124      * Started in on creation of the first daughter region.
125      */
126     STARTED_REGION_A_CREATION,
127     /**
128      * Started in on the creation of the second daughter region.
129      */
130     STARTED_REGION_B_CREATION,
131     /**
132      * Point of no return.
133      * If we got here, then transaction is not recoverable other than by
134      * crashing out the regionserver.
135      */
136     PONR
137   }
138 
139   /*
140    * Journal of how far the split transaction has progressed.
141    */
142   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
143 
144   /**
145    * Constructor
146    * @param r Region to split
147    * @param splitrow Row to split around
148    */
149   public SplitTransaction(final HRegion r, final byte [] splitrow) {
150     this.parent = r;
151     this.splitrow = splitrow;
152   }
153 
154   /**
155    * Does checks on split inputs.
156    * @return <code>true</code> if the region is splittable else
157    * <code>false</code> if it is not (e.g. its already closed, etc.).
158    */
159   public boolean prepare() {
160     if (!this.parent.isSplittable()) return false;
161     // Split key can be null if this region is unsplittable; i.e. has refs.
162     if (this.splitrow == null) return false;
163     HRegionInfo hri = this.parent.getRegionInfo();
164     parent.prepareToSplit();
165     // Check splitrow.
166     byte [] startKey = hri.getStartKey();
167     byte [] endKey = hri.getEndKey();
168     if (Bytes.equals(startKey, splitrow) ||
169         !this.parent.getRegionInfo().containsRow(splitrow)) {
170       LOG.info("Split row is not inside region key range or is equal to " +
171           "startkey: " + Bytes.toStringBinary(this.splitrow));
172       return false;
173     }
174     long rid = getDaughterRegionIdTimestamp(hri);
175     this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
176     this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
177     return true;
178   }
179 
180   /**
181    * Calculate daughter regionid to use.
182    * @param hri Parent {@link HRegionInfo}
183    * @return Daughter region id (timestamp) to use.
184    */
185   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
186     long rid = EnvironmentEdgeManager.currentTimeMillis();
187     // Regionid is timestamp.  Can't be less than that of parent else will insert
188     // at wrong location in hbase:meta (See HBASE-710).
189     if (rid < hri.getRegionId()) {
190       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
191         " but current time here is " + rid);
192       rid = hri.getRegionId() + 1;
193     }
194     return rid;
195   }
196 
197   private static IOException closedByOtherException = new IOException(
198       "Failed to close region: already closed by another thread");
199 
200   /**
201    * Prepare the regions and region files.
202    * @param server Hosting server instance.  Can be null when testing (won't try
203    * and update in zk if a null server)
204    * @param services Used to online/offline regions.
205    * @throws IOException If thrown, transaction failed.
206    *    Call {@link #rollback(Server, RegionServerServices)}
207    * @return Regions created
208    */
209   /* package */PairOfSameType<HRegion> createDaughters(final Server server,
210       final RegionServerServices services) throws IOException {
211     LOG.info("Starting split of region " + this.parent);
212     if ((server != null && server.isStopped()) ||
213         (services != null && services.isStopping())) {
214       throw new IOException("Server is stopped or stopping");
215     }
216     assert !this.parent.lock.writeLock().isHeldByCurrentThread():
217       "Unsafe to hold write lock while performing RPCs";
218 
219     // Coprocessor callback
220     if (this.parent.getCoprocessorHost() != null) {
221       this.parent.getCoprocessorHost().preSplit();
222     }
223 
224     // Coprocessor callback
225     if (this.parent.getCoprocessorHost() != null) {
226       this.parent.getCoprocessorHost().preSplit(this.splitrow);
227     }
228 
229     // If true, no cluster to write meta edits to or to update znodes in.
230     boolean testing = server == null? true:
231         server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
232     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
233         server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
234           this.fileSplitTimeout);
235 
236     PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
237 
238     List<Mutation> metaEntries = new ArrayList<Mutation>();
239     if (this.parent.getCoprocessorHost() != null) {
240       if (this.parent.getCoprocessorHost().
241           preSplitBeforePONR(this.splitrow, metaEntries)) {
242         throw new IOException("Coprocessor bypassing region "
243             + this.parent.getRegionNameAsString() + " split.");
244       }
245       try {
246         for (Mutation p : metaEntries) {
247           HRegionInfo.parseRegionName(p.getRow());
248         }
249       } catch (IOException e) {
250         LOG.error("Row key of mutation from coprossor is not parsable as region name."
251             + "Mutations from coprocessor should only for hbase:meta table.");
252         throw e;
253       }
254     }
255 
256     // This is the point of no return.  Adding subsequent edits to .META. as we
257     // do below when we do the daughter opens adding each to .META. can fail in
258     // various interesting ways the most interesting of which is a timeout
259     // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
260     // then subsequent failures need to crash out this regionserver; the
261     // server shutdown processing should be able to fix-up the incomplete split.
262     // The offlined parent will have the daughters as extra columns.  If
263     // we leave the daughter regions in place and do not remove them when we
264     // crash out, then they will have their references to the parent in place
265     // still and the server shutdown fixup of .META. will point to these
266     // regions.
267     // We should add PONR JournalEntry before offlineParentInMeta,so even if
268     // OfflineParentInMeta timeout,this will cause regionserver exit,and then
269     // master ServerShutdownHandler will fix daughter & avoid data loss. (See
270     // HBase-4562).
271     this.journal.add(JournalEntry.PONR);
272 
273     // Edit parent in meta.  Offlines parent region and adds splita and splitb
274     // as an atomic update. See HBASE-7721. This update to META makes the region
275     // will determine whether the region is split or not in case of failures.
276     // If it is successful, master will roll-forward, if not, master will rollback
277     // and assign the parent region.
278     if (!testing && useZKForAssignment) {
279       if (metaEntries == null || metaEntries.isEmpty()) {
280         MetaTableAccessor.splitRegion(server.getShortCircuitConnection(),
281           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
282           daughterRegions.getSecond().getRegionInfo(), server.getServerName());
283       } else {
284         offlineParentInMetaAndputMetaEntries(server.getShortCircuitConnection(),
285           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
286               .getSecond().getRegionInfo(), server.getServerName(), metaEntries);
287       }
288     } else if (services != null && !useZKForAssignment) {
289       if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
290           parent.getRegionInfo(), hri_a, hri_b)) {
291         // Passed PONR, let SSH clean it up
292         throw new IOException("Failed to notify master that split passed PONR: "
293           + parent.getRegionInfo().getRegionNameAsString());
294       }
295     }
296     return daughterRegions;
297   }
298 
299   public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
300       final RegionServerServices services, boolean testing) throws IOException {
301 
302     if (useCoordinatedStateManager(server)) {
303       if (std == null) {
304         std =
305             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
306                 .getSplitTransactionCoordination().getDefaultDetails();
307       }
308       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
309           .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
310             hri_a, hri_b);
311     } else if (services != null && !useZKForAssignment) {
312       if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
313           parent.getRegionInfo(), hri_a, hri_b)) {
314         throw new IOException("Failed to get ok from master to split "
315           + parent.getRegionNameAsString());
316       }
317     }
318     this.journal.add(JournalEntry.SET_SPLITTING);
319     if (useCoordinatedStateManager(server)) {
320       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
321           .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
322             hri_b, std);
323     }
324 
325     this.parent.getRegionFileSystem().createSplitsDir();
326     this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
327 
328     Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
329     Exception exceptionToThrow = null;
330     try{
331       hstoreFilesToSplit = this.parent.close(false);
332     } catch (Exception e) {
333       exceptionToThrow = e;
334     }
335     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
336       // The region was closed by a concurrent thread.  We can't continue
337       // with the split, instead we must just abandon the split.  If we
338       // reopen or split this could cause problems because the region has
339       // probably already been moved to a different server, or is in the
340       // process of moving to a different server.
341       exceptionToThrow = closedByOtherException;
342     }
343     if (exceptionToThrow != closedByOtherException) {
344       this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
345     }
346     if (exceptionToThrow != null) {
347       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
348       throw new IOException(exceptionToThrow);
349     }
350     if (!testing) {
351       services.removeFromOnlineRegions(this.parent, null);
352     }
353     this.journal.add(JournalEntry.OFFLINED_PARENT);
354 
355     // TODO: If splitStoreFiles were multithreaded would we complete steps in
356     // less elapsed time?  St.Ack 20100920
357     //
358     // splitStoreFiles creates daughter region dirs under the parent splits dir
359     // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
360     // clean this up.
361     splitStoreFiles(hstoreFilesToSplit);
362 
363     // Log to the journal that we are creating region A, the first daughter
364     // region.  We could fail halfway through.  If we do, we could have left
365     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
366     // add entry to journal BEFORE rather than AFTER the change.
367     this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
368     HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
369 
370     // Ditto
371     this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
372     HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
373     return new PairOfSameType<HRegion>(a, b);
374   }
375 
376   /**
377    * Perform time consuming opening of the daughter regions.
378    * @param server Hosting server instance.  Can be null when testing
379    * @param services Used to online/offline regions.
380    * @param a first daughter region
381    * @param a second daughter region
382    * @throws IOException If thrown, transaction failed.
383    *          Call {@link #rollback(Server, RegionServerServices)}
384    */
385   /* package */void openDaughters(final Server server,
386       final RegionServerServices services, HRegion a, HRegion b)
387       throws IOException {
388     boolean stopped = server != null && server.isStopped();
389     boolean stopping = services != null && services.isStopping();
390     // TODO: Is this check needed here?
391     if (stopped || stopping) {
392       LOG.info("Not opening daughters " +
393           b.getRegionInfo().getRegionNameAsString() +
394           " and " +
395           a.getRegionInfo().getRegionNameAsString() +
396           " because stopping=" + stopping + ", stopped=" + stopped);
397     } else {
398       // Open daughters in parallel.
399       DaughterOpener aOpener = new DaughterOpener(server, a);
400       DaughterOpener bOpener = new DaughterOpener(server, b);
401       aOpener.start();
402       bOpener.start();
403       try {
404         aOpener.join();
405         bOpener.join();
406       } catch (InterruptedException e) {
407         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
408       }
409       if (aOpener.getException() != null) {
410         throw new IOException("Failed " +
411           aOpener.getName(), aOpener.getException());
412       }
413       if (bOpener.getException() != null) {
414         throw new IOException("Failed " +
415           bOpener.getName(), bOpener.getException());
416       }
417       if (services != null) {
418         try {
419           if (useZKForAssignment) {
420             // add 2nd daughter first (see HBASE-4335)
421             services.postOpenDeployTasks(b);
422           } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
423               parent.getRegionInfo(), hri_a, hri_b)) {
424             throw new IOException("Failed to report split region to master: "
425               + parent.getRegionInfo().getShortNameToLog());
426           }
427           // Should add it to OnlineRegions
428           services.addToOnlineRegions(b);
429           if (useZKForAssignment) {
430             services.postOpenDeployTasks(a);
431           }
432           services.addToOnlineRegions(a);
433         } catch (KeeperException ke) {
434           throw new IOException(ke);
435         }
436       }
437     }
438   }
439 
440   /**
441    * Run the transaction.
442    * @param server Hosting server instance.  Can be null when testing
443    * @param services Used to online/offline regions.
444    * @throws IOException If thrown, transaction failed.
445    *          Call {@link #rollback(Server, RegionServerServices)}
446    * @return Regions created
447    * @throws IOException
448    * @see #rollback(Server, RegionServerServices)
449    */
450   public PairOfSameType<HRegion> execute(final Server server,
451       final RegionServerServices services)
452   throws IOException {
453     useZKForAssignment = server == null ? true :
454       ConfigUtil.useZKForAssignment(server.getConfiguration());
455     if (useCoordinatedStateManager(server)) {
456       std =
457           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
458               .getSplitTransactionCoordination().getDefaultDetails();
459     }
460     PairOfSameType<HRegion> regions = createDaughters(server, services);
461     if (this.parent.getCoprocessorHost() != null) {
462       this.parent.getCoprocessorHost().preSplitAfterPONR();
463     }
464     return stepsAfterPONR(server, services, regions);
465   }
466 
467   public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
468       final RegionServerServices services, PairOfSameType<HRegion> regions)
469       throws IOException {
470     openDaughters(server, services, regions.getFirst(), regions.getSecond());
471     if (useCoordinatedStateManager(server)) {
472       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
473           .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
474             regions.getSecond(), std, parent);
475     }
476     // Coprocessor callback
477     if (parent.getCoprocessorHost() != null) {
478       parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
479     }
480 
481 
482     return regions;
483   }
484 
485   private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
486       HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
487       ServerName serverName, List<Mutation> metaEntries) throws IOException {
488     List<Mutation> mutations = metaEntries;
489     HRegionInfo copyOfParent = new HRegionInfo(parent);
490     copyOfParent.setOffline(true);
491     copyOfParent.setSplit(true);
492 
493     //Put for parent
494     Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
495     MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
496     mutations.add(putParent);
497     
498     //Puts for daughters
499     Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
500     Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
501 
502     addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
503     addLocation(putB, serverName, 1);
504     mutations.add(putA);
505     mutations.add(putB);
506     MetaTableAccessor.mutateMetaTable(hConnection, mutations);
507   }
508 
509   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
510     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
511       Bytes.toBytes(sn.getHostAndPort()));
512     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
513       Bytes.toBytes(sn.getStartcode()));
514     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
515         Bytes.toBytes(openSeqNum));
516     return p;
517   }
518 
519   /*
520    * Open daughter region in its own thread.
521    * If we fail, abort this hosting server.
522    */
523   class DaughterOpener extends HasThread {
524     private final Server server;
525     private final HRegion r;
526     private Throwable t = null;
527 
528     DaughterOpener(final Server s, final HRegion r) {
529       super((s == null? "null-services": s.getServerName()) +
530         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
531       setDaemon(true);
532       this.server = s;
533       this.r = r;
534     }
535 
536     /**
537      * @return Null if open succeeded else exception that causes us fail open.
538      * Call it after this thread exits else you may get wrong view on result.
539      */
540     Throwable getException() {
541       return this.t;
542     }
543 
544     @Override
545     public void run() {
546       try {
547         openDaughterRegion(this.server, r);
548       } catch (Throwable t) {
549         this.t = t;
550       }
551     }
552   }
553 
554   /**
555    * Open daughter regions, add them to online list and update meta.
556    * @param server
557    * @param daughter
558    * @throws IOException
559    * @throws KeeperException
560    */
561   void openDaughterRegion(final Server server, final HRegion daughter)
562   throws IOException, KeeperException {
563     HRegionInfo hri = daughter.getRegionInfo();
564     LoggingProgressable reporter = server == null ? null
565         : new LoggingProgressable(hri, server.getConfiguration().getLong(
566             "hbase.regionserver.split.daughter.open.log.interval", 10000));
567     daughter.openHRegion(reporter);
568   }
569 
570   static class LoggingProgressable implements CancelableProgressable {
571     private final HRegionInfo hri;
572     private long lastLog = -1;
573     private final long interval;
574 
575     LoggingProgressable(final HRegionInfo hri, final long interval) {
576       this.hri = hri;
577       this.interval = interval;
578     }
579 
580     @Override
581     public boolean progress() {
582       long now = System.currentTimeMillis();
583       if (now - lastLog > this.interval) {
584         LOG.info("Opening " + this.hri.getRegionNameAsString());
585         this.lastLog = now;
586       }
587       return true;
588     }
589   }
590 
591   private boolean useCoordinatedStateManager(final Server server) {
592     return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
593   }
594 
595   private void splitStoreFiles(final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
596       throws IOException {
597     if (hstoreFilesToSplit == null) {
598       // Could be null because close didn't succeed -- for now consider it fatal
599       throw new IOException("Close returned empty list of StoreFiles");
600     }
601     // The following code sets up a thread pool executor with as many slots as
602     // there's files to split. It then fires up everything, waits for
603     // completion and finally checks for any exception
604     int nbFiles = hstoreFilesToSplit.size();
605     if (nbFiles == 0) {
606       // no file needs to be splitted.
607       return;
608     }
609     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
610     builder.setNameFormat("StoreFileSplitter-%1$d");
611     ThreadFactory factory = builder.build();
612     ThreadPoolExecutor threadPool =
613       (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
614     List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
615 
616     // Split each store file.
617     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
618       for (StoreFile sf: entry.getValue()) {
619         StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
620         futures.add(threadPool.submit(sfs));
621       }
622     }
623     // Shutdown the pool
624     threadPool.shutdown();
625 
626     // Wait for all the tasks to finish
627     try {
628       boolean stillRunning = !threadPool.awaitTermination(
629           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
630       if (stillRunning) {
631         threadPool.shutdownNow();
632         // wait for the thread to shutdown completely.
633         while (!threadPool.isTerminated()) {
634           Thread.sleep(50);
635         }
636         throw new IOException("Took too long to split the" +
637             " files and create the references, aborting split");
638       }
639     } catch (InterruptedException e) {
640       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
641     }
642 
643     // Look for any exception
644     for (Future<Void> future: futures) {
645       try {
646         future.get();
647       } catch (InterruptedException e) {
648         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
649       } catch (ExecutionException e) {
650         throw new IOException(e);
651       }
652     }
653   }
654 
655   private void splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
656     HRegionFileSystem fs = this.parent.getRegionFileSystem();
657     String familyName = Bytes.toString(family);
658     fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
659     fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true);
660   }
661 
662   /**
663    * Utility class used to do the file splitting / reference writing
664    * in parallel instead of sequentially.
665    */
666   class StoreFileSplitter implements Callable<Void> {
667     private final byte[] family;
668     private final StoreFile sf;
669 
670     /**
671      * Constructor that takes what it needs to split
672      * @param family Family that contains the store file
673      * @param sf which file
674      */
675     public StoreFileSplitter(final byte[] family, final StoreFile sf) {
676       this.sf = sf;
677       this.family = family;
678     }
679 
680     public Void call() throws IOException {
681       splitStoreFile(family, sf);
682       return null;
683     }
684   }
685 
686   /**
687    * @param server Hosting server instance (May be null when testing).
688    * @param services
689    * @throws IOException If thrown, rollback failed.  Take drastic action.
690    * @return True if we successfully rolled back, false if we got to the point
691    * of no return and so now need to abort the server to minimize damage.
692    */
693   @SuppressWarnings("deprecation")
694   public boolean rollback(final Server server, final RegionServerServices services)
695   throws IOException {
696     // Coprocessor callback
697     if (this.parent.getCoprocessorHost() != null) {
698       this.parent.getCoprocessorHost().preRollBackSplit();
699     }
700 
701     boolean result = true;
702     ListIterator<JournalEntry> iterator =
703       this.journal.listIterator(this.journal.size());
704     // Iterate in reverse.
705     while (iterator.hasPrevious()) {
706       JournalEntry je = iterator.previous();
707       switch(je) {
708 
709       case SET_SPLITTING:
710         if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
711           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
712               .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
713         } else if (services != null && !useZKForAssignment
714             && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
715                 parent.getRegionInfo(), hri_a, hri_b)) {
716           return false;
717         }
718         break;
719 
720       case CREATE_SPLIT_DIR:
721         this.parent.writestate.writesEnabled = true;
722         this.parent.getRegionFileSystem().cleanupSplitsDir();
723         break;
724 
725       case CLOSED_PARENT_REGION:
726         try {
727           // So, this returns a seqid but if we just closed and then reopened, we
728           // should be ok. On close, we flushed using sequenceid obtained from
729           // hosting regionserver so no need to propagate the sequenceid returned
730           // out of initialize below up into regionserver as we normally do.
731           // TODO: Verify.
732           this.parent.initialize();
733         } catch (IOException e) {
734           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
735             this.parent.getRegionNameAsString(), e);
736           throw new RuntimeException(e);
737         }
738         break;
739 
740       case STARTED_REGION_A_CREATION:
741         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
742         break;
743 
744       case STARTED_REGION_B_CREATION:
745         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
746         break;
747 
748       case OFFLINED_PARENT:
749         if (services != null) services.addToOnlineRegions(this.parent);
750         break;
751 
752       case PONR:
753         // We got to the point-of-no-return so we need to just abort. Return
754         // immediately.  Do not clean up created daughter regions.  They need
755         // to be in place so we don't delete the parent region mistakenly.
756         // See HBASE-3872.
757         return false;
758 
759       default:
760         throw new RuntimeException("Unhandled journal entry: " + je);
761       }
762     }
763     // Coprocessor callback
764     if (this.parent.getCoprocessorHost() != null) {
765       this.parent.getCoprocessorHost().postRollBackSplit();
766     }
767     return result;
768   }
769 
770   HRegionInfo getFirstDaughter() {
771     return hri_a;
772   }
773 
774   HRegionInfo getSecondDaughter() {
775     return hri_b;
776   }
777 
778 }