View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.ListIterator;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.Server;
42  import org.apache.hadoop.hbase.ServerName;
43  import org.apache.hadoop.hbase.client.Mutation;
44  import org.apache.hadoop.hbase.client.Put;
45  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.CancelableProgressable;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.hbase.util.FSUtils;
50  import org.apache.hadoop.hbase.util.HasThread;
51  import org.apache.hadoop.hbase.util.Pair;
52  import org.apache.hadoop.hbase.util.PairOfSameType;
53  import org.apache.zookeeper.KeeperException;
54  
55  import com.google.common.util.concurrent.ThreadFactoryBuilder;
56  
57  /**
58   * Executes region split as a "transaction".  Call {@link #prepare()} to setup
59   * the transaction, {@link #execute(Server, RegionServerServices)} to run the
60   * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
61   *
62   * <p>Here is an example of how you would use this class:
63   * <pre>
64   *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
65   *  if (!st.prepare()) return;
66   *  try {
67   *    st.execute(server, services);
68   *  } catch (IOException ioe) {
69   *    try {
70   *      st.rollback(server, services);
71   *      return;
72   *    } catch (RuntimeException e) {
73   *      myAbortable.abort("Failed split, abort");
74   *    }
75   *  }
76   * </Pre>
77   * <p>This class is not thread safe.  Caller needs ensure split is run by
78   * one thread only.
79   */
80  @InterfaceAudience.Private
81  public class SplitTransaction {
82    private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
83  
84    /*
85     * Region to split
86     */
87    private final HRegion parent;
88    private HRegionInfo hri_a;
89    private HRegionInfo hri_b;
90    private long fileSplitTimeout = 30000;
91  
92    /*
93     * Row to split around
94     */
95    private final byte [] splitrow;
96  
97    /**
98     * Types to add to the transaction journal.
99     * Each enum is a step in the split transaction. Used to figure how much
100    * we need to rollback.
101    */
102   static enum JournalEntryType {
103     /**
104      * Started
105      */
106     STARTED,
107     /**
108      * Prepared (after table lock)
109      */
110     PREPARED,
111     /**
112      * Before preSplit coprocessor hook
113      */
114     BEFORE_PRE_SPLIT_HOOK,
115     /**
116      * After preSplit coprocessor hook
117      */
118     AFTER_PRE_SPLIT_HOOK,
119     /**
120      * Set region as in transition, set it into SPLITTING state.
121      */
122     SET_SPLITTING,
123     /**
124      * We created the temporary split data directory.
125      */
126     CREATE_SPLIT_DIR,
127     /**
128      * Closed the parent region.
129      */
130     CLOSED_PARENT_REGION,
131     /**
132      * The parent has been taken out of the server's online regions list.
133      */
134     OFFLINED_PARENT,
135     /**
136      * Started in on creation of the first daughter region.
137      */
138     STARTED_REGION_A_CREATION,
139     /**
140      * Started in on the creation of the second daughter region.
141      */
142     STARTED_REGION_B_CREATION,
143     /**
144      * Opened the first daughter region
145      */
146     OPENED_REGION_A,
147     /**
148      * Opened the second daughter region
149      */
150     OPENED_REGION_B,
151     /**
152      * Before postSplit coprocessor hook
153      */
154     BEFORE_POST_SPLIT_HOOK,
155     /**
156      * After postSplit coprocessor hook
157      */
158     AFTER_POST_SPLIT_HOOK,
159     /**
160      * Point of no return.
161      * If we got here, then transaction is not recoverable other than by
162      * crashing out the regionserver.
163      */
164     PONR
165   }
166 
167   static class JournalEntry {
168     private JournalEntryType type;
169     private long timestamp;
170 
171     public JournalEntry(JournalEntryType type) {
172       this(type, EnvironmentEdgeManager.currentTime());
173     }
174 
175     public JournalEntry(JournalEntryType type, long timestamp) {
176       this.type = type;
177       this.timestamp = timestamp;
178     }
179 
180     @Override
181     public String toString() {
182       StringBuilder sb = new StringBuilder();
183       sb.append(type);
184       sb.append(" at ");
185       sb.append(timestamp);
186       return sb.toString();
187     }
188   }
189 
190   /*
191    * Journal of how far the split transaction has progressed.
192    */
193   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
194 
195   /**
196    * Constructor
197    * @param r Region to split
198    * @param splitrow Row to split around
199    */
200   public SplitTransaction(final HRegion r, final byte [] splitrow) {
201     this.parent = r;
202     this.splitrow = splitrow;
203     this.journal.add(new JournalEntry(JournalEntryType.STARTED));
204   }
205 
206   /**
207    * Does checks on split inputs.
208    * @return <code>true</code> if the region is splittable else
209    * <code>false</code> if it is not (e.g. its already closed, etc.).
210    */
211   public boolean prepare() {
212     if (!this.parent.isSplittable()) return false;
213     // Split key can be null if this region is unsplittable; i.e. has refs.
214     if (this.splitrow == null) return false;
215     HRegionInfo hri = this.parent.getRegionInfo();
216     parent.prepareToSplit();
217     // Check splitrow.
218     byte [] startKey = hri.getStartKey();
219     byte [] endKey = hri.getEndKey();
220     if (Bytes.equals(startKey, splitrow) ||
221         !this.parent.getRegionInfo().containsRow(splitrow)) {
222       LOG.info("Split row is not inside region key range or is equal to " +
223           "startkey: " + Bytes.toStringBinary(this.splitrow));
224       return false;
225     }
226     long rid = getDaughterRegionIdTimestamp(hri);
227     this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
228     this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
229     this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
230     return true;
231   }
232 
233   /**
234    * Calculate daughter regionid to use.
235    * @param hri Parent {@link HRegionInfo}
236    * @return Daughter region id (timestamp) to use.
237    */
238   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
239     long rid = EnvironmentEdgeManager.currentTime();
240     // Regionid is timestamp.  Can't be less than that of parent else will insert
241     // at wrong location in hbase:meta (See HBASE-710).
242     if (rid < hri.getRegionId()) {
243       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
244         " but current time here is " + rid);
245       rid = hri.getRegionId() + 1;
246     }
247     return rid;
248   }
249 
250   private static IOException closedByOtherException = new IOException(
251       "Failed to close region: already closed by another thread");
252 
253   /**
254    * Prepare the regions and region files.
255    * @param server Hosting server instance.  Can be null when testing (won't try
256    * and update in zk if a null server)
257    * @param services Used to online/offline regions.
258    * @throws IOException If thrown, transaction failed.
259    *    Call {@link #rollback(Server, RegionServerServices)}
260    * @return Regions created
261    */
262   /* package */PairOfSameType<HRegion> createDaughters(final Server server,
263       final RegionServerServices services) throws IOException {
264     LOG.info("Starting split of region " + this.parent);
265     if ((server != null && server.isStopped()) ||
266         (services != null && services.isStopping())) {
267       throw new IOException("Server is stopped or stopping");
268     }
269     assert !this.parent.lock.writeLock().isHeldByCurrentThread():
270       "Unsafe to hold write lock while performing RPCs";
271 
272     journal.add(new JournalEntry(JournalEntryType.BEFORE_PRE_SPLIT_HOOK));
273 
274     // Coprocessor callback
275     if (this.parent.getCoprocessorHost() != null) {
276       // TODO: Remove one of these
277       this.parent.getCoprocessorHost().preSplit();
278       this.parent.getCoprocessorHost().preSplit(this.splitrow);
279     }
280 
281     journal.add(new JournalEntry(JournalEntryType.AFTER_PRE_SPLIT_HOOK));
282 
283     // If true, no cluster to write meta edits to or to update znodes in.
284     boolean testing = server == null? true:
285         server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
286     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
287         server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
288           this.fileSplitTimeout);
289 
290     PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
291 
292     List<Mutation> metaEntries = new ArrayList<Mutation>();
293     if (this.parent.getCoprocessorHost() != null) {
294       if (this.parent.getCoprocessorHost().
295           preSplitBeforePONR(this.splitrow, metaEntries)) {
296         throw new IOException("Coprocessor bypassing region "
297             + this.parent.getRegionNameAsString() + " split.");
298       }
299       try {
300         for (Mutation p : metaEntries) {
301           HRegionInfo.parseRegionName(p.getRow());
302         }
303       } catch (IOException e) {
304         LOG.error("Row key of mutation from coprossor is not parsable as region name."
305             + "Mutations from coprocessor should only for hbase:meta table.");
306         throw e;
307       }
308     }
309 
310     // This is the point of no return.  Adding subsequent edits to .META. as we
311     // do below when we do the daughter opens adding each to .META. can fail in
312     // various interesting ways the most interesting of which is a timeout
313     // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
314     // then subsequent failures need to crash out this regionserver; the
315     // server shutdown processing should be able to fix-up the incomplete split.
316     // The offlined parent will have the daughters as extra columns.  If
317     // we leave the daughter regions in place and do not remove them when we
318     // crash out, then they will have their references to the parent in place
319     // still and the server shutdown fixup of .META. will point to these
320     // regions.
321     // We should add PONR JournalEntry before offlineParentInMeta,so even if
322     // OfflineParentInMeta timeout,this will cause regionserver exit,and then
323     // master ServerShutdownHandler will fix daughter & avoid data loss. (See
324     // HBase-4562).
325     this.journal.add(new JournalEntry(JournalEntryType.PONR));
326 
327     // Edit parent in meta.  Offlines parent region and adds splita and splitb
328     // as an atomic update. See HBASE-7721. This update to META makes the region
329     // will determine whether the region is split or not in case of failures.
330     // If it is successful, master will roll-forward, if not, master will rollback
331     // and assign the parent region.
332     if (services != null && !services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
333         parent.getRegionInfo(), hri_a, hri_b)) {
334       // Passed PONR, let SSH clean it up
335       throw new IOException("Failed to notify master that split passed PONR: "
336         + parent.getRegionInfo().getRegionNameAsString());
337     }
338     return daughterRegions;
339   }
340 
341   public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
342       final RegionServerServices services, boolean testing) throws IOException {
343     if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
344         parent.getRegionInfo(), hri_a, hri_b)) {
345       throw new IOException("Failed to get ok from master to split "
346         + parent.getRegionNameAsString());
347     }
348     this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING));
349 
350     this.parent.getRegionFileSystem().createSplitsDir();
351     this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));
352 
353     Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
354     Exception exceptionToThrow = null;
355     try{
356       hstoreFilesToSplit = this.parent.close(false);
357     } catch (Exception e) {
358       exceptionToThrow = e;
359     }
360     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
361       // The region was closed by a concurrent thread.  We can't continue
362       // with the split, instead we must just abandon the split.  If we
363       // reopen or split this could cause problems because the region has
364       // probably already been moved to a different server, or is in the
365       // process of moving to a different server.
366       exceptionToThrow = closedByOtherException;
367     }
368     if (exceptionToThrow != closedByOtherException) {
369       this.journal.add(new JournalEntry(JournalEntryType.CLOSED_PARENT_REGION));
370     }
371     if (exceptionToThrow != null) {
372       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
373       throw new IOException(exceptionToThrow);
374     }
375     if (!testing) {
376       services.removeFromOnlineRegions(this.parent, null);
377     }
378     this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));
379 
380     // TODO: If splitStoreFiles were multithreaded would we complete steps in
381     // less elapsed time?  St.Ack 20100920
382     //
383     // splitStoreFiles creates daughter region dirs under the parent splits dir
384     // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
385     // clean this up.
386     Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
387 
388     // Log to the journal that we are creating region A, the first daughter
389     // region.  We could fail halfway through.  If we do, we could have left
390     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
391     // add entry to journal BEFORE rather than AFTER the change.
392     this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
393     assertReferenceFileCount(expectedReferences.getFirst(),
394         this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
395     HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
396     assertReferenceFileCount(expectedReferences.getFirst(),
397         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
398 
399     // Ditto
400     this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
401     assertReferenceFileCount(expectedReferences.getSecond(),
402         this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
403     HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
404     assertReferenceFileCount(expectedReferences.getSecond(),
405         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
406 
407     return new PairOfSameType<HRegion>(a, b);
408   }
409 
410   void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
411       throws IOException {
412     if (expectedReferenceFileCount != 0 &&
413         expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(this.parent.getFilesystem(), dir)) {
414       throw new IOException("Failing split. Expected reference file count isn't equal.");
415     }
416   }
417 
418   /**
419    * Perform time consuming opening of the daughter regions.
420    * @param server Hosting server instance.  Can be null when testing
421    * @param services Used to online/offline regions.
422    * @param a first daughter region
423    * @param a second daughter region
424    * @throws IOException If thrown, transaction failed.
425    *          Call {@link #rollback(Server, RegionServerServices)}
426    */
427   /* package */void openDaughters(final Server server,
428       final RegionServerServices services, HRegion a, HRegion b)
429       throws IOException {
430     boolean stopped = server != null && server.isStopped();
431     boolean stopping = services != null && services.isStopping();
432     // TODO: Is this check needed here?
433     if (stopped || stopping) {
434       LOG.info("Not opening daughters " +
435           b.getRegionInfo().getRegionNameAsString() +
436           " and " +
437           a.getRegionInfo().getRegionNameAsString() +
438           " because stopping=" + stopping + ", stopped=" + stopped);
439     } else {
440       // Open daughters in parallel.
441       DaughterOpener aOpener = new DaughterOpener(server, a);
442       DaughterOpener bOpener = new DaughterOpener(server, b);
443       aOpener.start();
444       bOpener.start();
445       try {
446         aOpener.join();
447         if (aOpener.getException() == null) {
448           journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_A));
449         }
450         bOpener.join();
451         if (bOpener.getException() == null) {
452           journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_B));
453         }
454       } catch (InterruptedException e) {
455         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
456       }
457       if (aOpener.getException() != null) {
458         throw new IOException("Failed " +
459           aOpener.getName(), aOpener.getException());
460       }
461       if (bOpener.getException() != null) {
462         throw new IOException("Failed " +
463           bOpener.getName(), bOpener.getException());
464       }
465       if (services != null) {
466         if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
467             parent.getRegionInfo(), hri_a, hri_b)) {
468           throw new IOException("Failed to report split region to master: "
469             + parent.getRegionInfo().getShortNameToLog());
470         }
471         // Should add it to OnlineRegions
472         services.addToOnlineRegions(b);
473         services.addToOnlineRegions(a);
474       }
475     }
476   }
477 
478   /**
479    * Run the transaction.
480    * @param server Hosting server instance.  Can be null when testing
481    * @param services Used to online/offline regions.
482    * @throws IOException If thrown, transaction failed.
483    *          Call {@link #rollback(Server, RegionServerServices)}
484    * @return Regions created
485    * @throws IOException
486    * @see #rollback(Server, RegionServerServices)
487    */
488   public PairOfSameType<HRegion> execute(final Server server,
489       final RegionServerServices services)
490   throws IOException {
491     PairOfSameType<HRegion> regions = createDaughters(server, services);
492     if (this.parent.getCoprocessorHost() != null) {
493       this.parent.getCoprocessorHost().preSplitAfterPONR();
494     }
495     return stepsAfterPONR(server, services, regions);
496   }
497 
498   public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
499       final RegionServerServices services, PairOfSameType<HRegion> regions)
500       throws IOException {
501     openDaughters(server, services, regions.getFirst(), regions.getSecond());
502     journal.add(new JournalEntry(JournalEntryType.BEFORE_POST_SPLIT_HOOK));
503     // Coprocessor callback
504     if (parent.getCoprocessorHost() != null) {
505       parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
506     }
507     journal.add(new JournalEntry(JournalEntryType.AFTER_POST_SPLIT_HOOK));
508     return regions;
509   }
510 
511   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
512     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
513       Bytes.toBytes(sn.getHostAndPort()));
514     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
515       Bytes.toBytes(sn.getStartcode()));
516     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
517         Bytes.toBytes(openSeqNum));
518     return p;
519   }
520 
521   /*
522    * Open daughter region in its own thread.
523    * If we fail, abort this hosting server.
524    */
525   class DaughterOpener extends HasThread {
526     private final Server server;
527     private final HRegion r;
528     private Throwable t = null;
529 
530     DaughterOpener(final Server s, final HRegion r) {
531       super((s == null? "null-services": s.getServerName()) +
532         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
533       setDaemon(true);
534       this.server = s;
535       this.r = r;
536     }
537 
538     /**
539      * @return Null if open succeeded else exception that causes us fail open.
540      * Call it after this thread exits else you may get wrong view on result.
541      */
542     Throwable getException() {
543       return this.t;
544     }
545 
546     @Override
547     public void run() {
548       try {
549         openDaughterRegion(this.server, r);
550       } catch (Throwable t) {
551         this.t = t;
552       }
553     }
554   }
555 
556   /**
557    * Open daughter regions, add them to online list and update meta.
558    * @param server
559    * @param daughter
560    * @throws IOException
561    * @throws KeeperException
562    */
563   void openDaughterRegion(final Server server, final HRegion daughter)
564   throws IOException, KeeperException {
565     HRegionInfo hri = daughter.getRegionInfo();
566     LoggingProgressable reporter = server == null ? null
567         : new LoggingProgressable(hri, server.getConfiguration().getLong(
568             "hbase.regionserver.split.daughter.open.log.interval", 10000));
569     daughter.openHRegion(reporter);
570   }
571 
572   static class LoggingProgressable implements CancelableProgressable {
573     private final HRegionInfo hri;
574     private long lastLog = -1;
575     private final long interval;
576 
577     LoggingProgressable(final HRegionInfo hri, final long interval) {
578       this.hri = hri;
579       this.interval = interval;
580     }
581 
582     @Override
583     public boolean progress() {
584       long now = EnvironmentEdgeManager.currentTime();
585       if (now - lastLog > this.interval) {
586         LOG.info("Opening " + this.hri.getRegionNameAsString());
587         this.lastLog = now;
588       }
589       return true;
590     }
591   }
592 
593   /**
594    * Creates reference files for top and bottom half of the
595    * @param hstoreFilesToSplit map of store files to create half file references for.
596    * @return the number of reference files that were created.
597    * @throws IOException
598    */
599   private Pair<Integer, Integer> splitStoreFiles(
600       final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
601       throws IOException {
602     if (hstoreFilesToSplit == null) {
603       // Could be null because close didn't succeed -- for now consider it fatal
604       throw new IOException("Close returned empty list of StoreFiles");
605     }
606     // The following code sets up a thread pool executor with as many slots as
607     // there's files to split. It then fires up everything, waits for
608     // completion and finally checks for any exception
609     int nbFiles = hstoreFilesToSplit.size();
610     if (nbFiles == 0) {
611       // no file needs to be splitted.
612       return new Pair<Integer, Integer>(0,0);
613     }
614     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
615     builder.setNameFormat("StoreFileSplitter-%1$d");
616     ThreadFactory factory = builder.build();
617     ThreadPoolExecutor threadPool =
618       (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
619     List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
620 
621     // Split each store file.
622     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
623       for (StoreFile sf: entry.getValue()) {
624         StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
625         futures.add(threadPool.submit(sfs));
626       }
627     }
628     // Shutdown the pool
629     threadPool.shutdown();
630 
631     // Wait for all the tasks to finish
632     try {
633       boolean stillRunning = !threadPool.awaitTermination(
634           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
635       if (stillRunning) {
636         threadPool.shutdownNow();
637         // wait for the thread to shutdown completely.
638         while (!threadPool.isTerminated()) {
639           Thread.sleep(50);
640         }
641         throw new IOException("Took too long to split the" +
642             " files and create the references, aborting split");
643       }
644     } catch (InterruptedException e) {
645       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
646     }
647 
648     int created_a = 0;
649     int created_b = 0;
650     // Look for any exception
651     for (Future<Pair<Path, Path>> future : futures) {
652       try {
653         Pair<Path, Path> p = future.get();
654         created_a += p.getFirst() != null ? 1 : 0;
655         created_b += p.getSecond() != null ? 1 : 0;
656       } catch (InterruptedException e) {
657         throw (InterruptedIOException) new InterruptedIOException().initCause(e);
658       } catch (ExecutionException e) {
659         throw new IOException(e);
660       }
661     }
662 
663     return new Pair<Integer, Integer>(created_a, created_b);
664   }
665 
666   private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
667     HRegionFileSystem fs = this.parent.getRegionFileSystem();
668     String familyName = Bytes.toString(family);
669     Path path_a =
670         fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
671           this.parent.getSplitPolicy());
672     Path path_b =
673         fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
674           this.parent.getSplitPolicy());
675     return new Pair<Path,Path>(path_a, path_b);
676   }
677 
678   /**
679    * Utility class used to do the file splitting / reference writing
680    * in parallel instead of sequentially.
681    */
682   class StoreFileSplitter implements Callable<Pair<Path,Path>> {
683     private final byte[] family;
684     private final StoreFile sf;
685 
686     /**
687      * Constructor that takes what it needs to split
688      * @param family Family that contains the store file
689      * @param sf which file
690      */
691     public StoreFileSplitter(final byte[] family, final StoreFile sf) {
692       this.sf = sf;
693       this.family = family;
694     }
695 
696     public Pair<Path,Path> call() throws IOException {
697       return splitStoreFile(family, sf);
698     }
699   }
700 
701   /**
702    * @param server Hosting server instance (May be null when testing).
703    * @param services
704    * @throws IOException If thrown, rollback failed.  Take drastic action.
705    * @return True if we successfully rolled back, false if we got to the point
706    * of no return and so now need to abort the server to minimize damage.
707    */
708   @SuppressWarnings("deprecation")
709   public boolean rollback(final Server server, final RegionServerServices services)
710   throws IOException {
711     // Coprocessor callback
712     if (this.parent.getCoprocessorHost() != null) {
713       this.parent.getCoprocessorHost().preRollBackSplit();
714     }
715 
716     boolean result = true;
717     ListIterator<JournalEntry> iterator =
718       this.journal.listIterator(this.journal.size());
719     // Iterate in reverse.
720     while (iterator.hasPrevious()) {
721       JournalEntry je = iterator.previous();
722       switch(je.type) {
723 
724       case SET_SPLITTING:
725         if (services != null
726             && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
727                 parent.getRegionInfo(), hri_a, hri_b)) {
728           return false;
729         }
730         break;
731 
732       case CREATE_SPLIT_DIR:
733         this.parent.writestate.writesEnabled = true;
734         this.parent.getRegionFileSystem().cleanupSplitsDir();
735         break;
736 
737       case CLOSED_PARENT_REGION:
738         try {
739           // So, this returns a seqid but if we just closed and then reopened, we
740           // should be ok. On close, we flushed using sequenceid obtained from
741           // hosting regionserver so no need to propagate the sequenceid returned
742           // out of initialize below up into regionserver as we normally do.
743           // TODO: Verify.
744           this.parent.initialize();
745         } catch (IOException e) {
746           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
747             this.parent.getRegionNameAsString(), e);
748           throw new RuntimeException(e);
749         }
750         break;
751 
752       case STARTED_REGION_A_CREATION:
753         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
754         break;
755 
756       case STARTED_REGION_B_CREATION:
757         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
758         break;
759 
760       case OFFLINED_PARENT:
761         if (services != null) services.addToOnlineRegions(this.parent);
762         break;
763 
764       case PONR:
765         // We got to the point-of-no-return so we need to just abort. Return
766         // immediately.  Do not clean up created daughter regions.  They need
767         // to be in place so we don't delete the parent region mistakenly.
768         // See HBASE-3872.
769         return false;
770 
771       // Informational only cases
772       case STARTED:
773       case PREPARED:
774       case BEFORE_PRE_SPLIT_HOOK:
775       case AFTER_PRE_SPLIT_HOOK:
776       case BEFORE_POST_SPLIT_HOOK:
777       case AFTER_POST_SPLIT_HOOK:
778       case OPENED_REGION_A:
779       case OPENED_REGION_B:
780         break;
781 
782       default:
783         throw new RuntimeException("Unhandled journal entry: " + je);
784       }
785     }
786     // Coprocessor callback
787     if (this.parent.getCoprocessorHost() != null) {
788       this.parent.getCoprocessorHost().postRollBackSplit();
789     }
790     return result;
791   }
792 
793   HRegionInfo getFirstDaughter() {
794     return hri_a;
795   }
796 
797   HRegionInfo getSecondDaughter() {
798     return hri_b;
799   }
800 
801   List<JournalEntry> getJournal() {
802     return journal;
803   }
804 }