View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.ListIterator;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.ThreadFactory;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.HBaseFileSystem;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.Server;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.catalog.MetaEditor;
45  import org.apache.hadoop.hbase.executor.EventHandler.EventType;
46  import org.apache.hadoop.hbase.executor.RegionTransitionData;
47  import org.apache.hadoop.hbase.io.Reference.Range;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.CancelableProgressable;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  import org.apache.hadoop.hbase.util.FSUtils;
52  import org.apache.hadoop.hbase.util.HasThread;
53  import org.apache.hadoop.hbase.util.PairOfSameType;
54  import org.apache.hadoop.hbase.util.Writables;
55  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
56  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
57  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
58  import org.apache.zookeeper.KeeperException;
59  import org.apache.zookeeper.KeeperException.NodeExistsException;
60  
61  import com.google.common.util.concurrent.ThreadFactoryBuilder;
62  
63  /**
64   * Executes region split as a "transaction".  Call {@link #prepare()} to setup
65   * the transaction, {@link #execute(Server, RegionServerServices)} to run the
66   * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
67   *
68   * <p>Here is an example of how you would use this class:
69   * <pre>
70   *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
71   *  if (!st.prepare()) return;
72   *  try {
73   *    st.execute(server, services);
74   *  } catch (IOException ioe) {
75   *    try {
76   *      st.rollback(server, services);
77   *      return;
78   *    } catch (RuntimeException e) {
79   *      myAbortable.abort("Failed split, abort");
80   *    }
81   *  }
82   * </Pre>
83   * <p>This class is not thread safe.  Caller needs ensure split is run by
84   * one thread only.
85   */
86  public class SplitTransaction {
87    private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
88    private static final String SPLITDIR = ".splits";
89  
90    /*
91     * Region to split
92     */
93    private final HRegion parent;
94    private HRegionInfo hri_a;
95    private HRegionInfo hri_b;
96    private Path splitdir;
97    private long fileSplitTimeout = 30000;
98    private int znodeVersion = -1;
99  
100   /*
101    * Row to split around
102    */
103   private final byte [] splitrow;
104 
105   /**
106    * Types to add to the transaction journal.
107    * Each enum is a step in the split transaction. Used to figure how much
108    * we need to rollback.
109    */
110   enum JournalEntry {
111     /**
112      * Set region as in transition, set it into SPLITTING state.
113      */
114     SET_SPLITTING_IN_ZK,
115     /**
116      * We created the temporary split data directory.
117      */
118     CREATE_SPLIT_DIR,
119     /**
120      * Closed the parent region.
121      */
122     CLOSED_PARENT_REGION,
123     /**
124      * The parent has been taken out of the server's online regions list.
125      */
126     OFFLINED_PARENT,
127     /**
128      * Started in on creation of the first daughter region.
129      */
130     STARTED_REGION_A_CREATION,
131     /**
132      * Started in on the creation of the second daughter region.
133      */
134     STARTED_REGION_B_CREATION,
135     /**
136      * Point of no return.
137      * If we got here, then transaction is not recoverable other than by
138      * crashing out the regionserver.
139      */
140     PONR
141   }
142 
143   /*
144    * Journal of how far the split transaction has progressed.
145    */
146   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
147 
148   /**
149    * Constructor
150    * @param r Region to split
151    * @param splitrow Row to split around
152    */
153   public SplitTransaction(final HRegion r, final byte [] splitrow) {
154     this.parent = r;
155     this.splitrow = splitrow;
156     this.splitdir = getSplitDir(this.parent);
157   }
158 
159   /**
160    * Does checks on split inputs.
161    * @return <code>true</code> if the region is splittable else
162    * <code>false</code> if it is not (e.g. its already closed, etc.).
163    */
164   public boolean prepare() {
165     if (!this.parent.isSplittable()) return false;
166     // Split key can be null if this region is unsplittable; i.e. has refs.
167     if (this.splitrow == null) return false;
168     HRegionInfo hri = this.parent.getRegionInfo();
169     parent.prepareToSplit();
170     // Check splitrow.
171     byte [] startKey = hri.getStartKey();
172     byte [] endKey = hri.getEndKey();
173     if (Bytes.equals(startKey, splitrow) ||
174         !this.parent.getRegionInfo().containsRow(splitrow)) {
175       LOG.info("Split row is not inside region key range or is equal to " +
176           "startkey: " + Bytes.toStringBinary(this.splitrow));
177       return false;
178     }
179     long rid = getDaughterRegionIdTimestamp(hri);
180     this.hri_a = new HRegionInfo(hri.getTableName(), startKey, this.splitrow,
181       false, rid);
182     this.hri_b = new HRegionInfo(hri.getTableName(), this.splitrow, endKey,
183       false, rid);
184     return true;
185   }
186 
187   /**
188    * Calculate daughter regionid to use.
189    * @param hri Parent {@link HRegionInfo}
190    * @return Daughter region id (timestamp) to use.
191    */
192   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
193     long rid = EnvironmentEdgeManager.currentTimeMillis();
194     // Regionid is timestamp.  Can't be less than that of parent else will insert
195     // at wrong location in .META. (See HBASE-710).
196     if (rid < hri.getRegionId()) {
197       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
198         " but current time here is " + rid);
199       rid = hri.getRegionId() + 1;
200     }
201     return rid;
202   }
203 
204   private static IOException closedByOtherException = new IOException(
205       "Failed to close region: already closed by another thread");
206   
207   /**
208    * Prepare the regions and region files.
209    * @param server Hosting server instance.  Can be null when testing (won't try
210    * and update in zk if a null server)
211    * @param services Used to online/offline regions.
212    * @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
213    * @return Regions created
214    */
215   /* package */PairOfSameType<HRegion> createDaughters(final Server server,
216       final RegionServerServices services) throws IOException {
217     LOG.info("Starting split of region " + this.parent);
218     if ((server != null && server.isStopped()) ||
219         (services != null && services.isStopping())) {
220       throw new IOException("Server is stopped or stopping");
221     }
222     assert !this.parent.lock.writeLock().isHeldByCurrentThread(): "Unsafe to hold write lock while performing RPCs";
223 
224     // Coprocessor callback
225     if (this.parent.getCoprocessorHost() != null) {
226       this.parent.getCoprocessorHost().preSplit();
227     }
228 
229     // If true, no cluster to write meta edits to or to update znodes in.
230     boolean testing = server == null? true:
231       server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
232     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
233       server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
234           this.fileSplitTimeout);
235 
236     // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
237     // have zookeeper so don't do zk stuff if server or zookeeper is null
238     if (server != null && server.getZooKeeper() != null) {
239       try {
240         createNodeSplitting(server.getZooKeeper(),
241           this.parent.getRegionInfo(), server.getServerName());
242       } catch (KeeperException e) {
243         throw new IOException("Failed creating SPLITTING znode on " +
244           this.parent.getRegionNameAsString(), e);
245       }
246     }
247     this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
248     if (server != null && server.getZooKeeper() != null) {
249       try {
250         // Transition node from SPLITTING to SPLITTING after creating the split node.
251         // Master will get the callback for node change only if the transition is successful.
252         // Note that if the transition fails then the rollback will delete the created znode
253         // TODO : May be we can add some new state to znode and handle the new state incase of success/failure
254         this.znodeVersion = transitionNodeSplitting(server.getZooKeeper(),
255             this.parent.getRegionInfo(), server.getServerName(), -1);
256       } catch (KeeperException e) {
257         throw new IOException("Failed setting SPLITTING znode on "
258             + this.parent.getRegionNameAsString(), e);
259       }
260     }
261     createSplitDir(this.parent.getFilesystem(), this.splitdir);
262     this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
263  
264     List<StoreFile> hstoreFilesToSplit = null;
265     Exception exceptionToThrow = null;
266     try{
267       hstoreFilesToSplit = this.parent.close(false);
268     } catch (Exception e) {
269       exceptionToThrow = e;
270     }
271     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
272       // The region was closed by a concurrent thread.  We can't continue
273       // with the split, instead we must just abandon the split.  If we
274       // reopen or split this could cause problems because the region has
275       // probably already been moved to a different server, or is in the
276       // process of moving to a different server.
277       exceptionToThrow = closedByOtherException;
278     }
279     if (exceptionToThrow != closedByOtherException) {
280       this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
281     }
282     if (exceptionToThrow != null) {
283       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
284       throw new IOException(exceptionToThrow);
285     }
286 
287     if (!testing) {
288       services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
289     }
290     this.journal.add(JournalEntry.OFFLINED_PARENT);
291 
292     // TODO: If splitStoreFiles were multithreaded would we complete steps in
293     // less elapsed time?  St.Ack 20100920
294     //
295     // splitStoreFiles creates daughter region dirs under the parent splits dir
296     // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
297     // clean this up.
298     splitStoreFiles(this.splitdir, hstoreFilesToSplit);
299 
300     // Log to the journal that we are creating region A, the first daughter
301     // region.  We could fail halfway through.  If we do, we could have left
302     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
303     // add entry to journal BEFORE rather than AFTER the change.
304     this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
305     HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices);
306 
307     // Ditto
308     this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
309     HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices);
310 
311     // This is the point of no return.  Adding subsequent edits to .META. as we
312     // do below when we do the daughter opens adding each to .META. can fail in
313     // various interesting ways the most interesting of which is a timeout
314     // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
315     // then subsequent failures need to crash out this regionserver; the
316     // server shutdown processing should be able to fix-up the incomplete split.
317     // The offlined parent will have the daughters as extra columns.  If
318     // we leave the daughter regions in place and do not remove them when we
319     // crash out, then they will have their references to the parent in place
320     // still and the server shutdown fixup of .META. will point to these
321     // regions.
322     // We should add PONR JournalEntry before offlineParentInMeta,so even if
323     // OfflineParentInMeta timeout,this will cause regionserver exit,and then
324     // master ServerShutdownHandler will fix daughter & avoid data loss. (See 
325     // HBase-4562).
326     this.journal.add(JournalEntry.PONR);
327 
328     // Edit parent in meta.  Offlines parent region and adds splita and splitb.
329     if (!testing) {
330       MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
331         this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
332     }
333     return new PairOfSameType<HRegion>(a, b);
334   }
335 
336   /**
337    * Perform time consuming opening of the daughter regions.
338    * @param server Hosting server instance.  Can be null when testing (won't try
339    * and update in zk if a null server)
340    * @param services Used to online/offline regions.
341    * @param a first daughter region
342    * @param a second daughter region
343    * @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
344    */
345   /* package */void openDaughters(final Server server,
346       final RegionServerServices services, HRegion a, HRegion b)
347       throws IOException {
348     boolean stopped = server != null && server.isStopped();
349     boolean stopping = services != null && services.isStopping();
350     // TODO: Is this check needed here?
351     if (stopped || stopping) {
352       LOG.info("Not opening daughters " +
353           b.getRegionInfo().getRegionNameAsString() +
354           " and " +
355           a.getRegionInfo().getRegionNameAsString() +
356           " because stopping=" + stopping + ", stopped=" + stopped);
357     } else {
358       // Open daughters in parallel.
359       DaughterOpener aOpener = new DaughterOpener(server, a);
360       DaughterOpener bOpener = new DaughterOpener(server, b);
361       aOpener.start();
362       bOpener.start();
363       try {
364         aOpener.join();
365         bOpener.join();
366       } catch (InterruptedException e) {
367         Thread.currentThread().interrupt();
368         throw new IOException("Interrupted " + e.getMessage());
369       }
370       if (aOpener.getException() != null) {
371         throw new IOException("Failed " +
372           aOpener.getName(), aOpener.getException());
373       }
374       if (bOpener.getException() != null) {
375         throw new IOException("Failed " +
376           bOpener.getName(), bOpener.getException());
377       }
378       if (services != null) {
379         try {
380           // add 2nd daughter first (see HBASE-4335)
381           services.postOpenDeployTasks(b, server.getCatalogTracker(), true);
382           // Should add it to OnlineRegions
383           services.addToOnlineRegions(b);
384           services.postOpenDeployTasks(a, server.getCatalogTracker(), true);
385           services.addToOnlineRegions(a);
386         } catch (KeeperException ke) {
387           throw new IOException(ke);
388         }
389       }
390     }
391   }
392 
393   /**
394    * Finish off split transaction, transition the zknode
395    * @param server Hosting server instance.  Can be null when testing (won't try
396    * and update in zk if a null server)
397    * @param services Used to online/offline regions.
398    * @param a first daughter region
399    * @param a second daughter region
400    * @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
401    */
402   /* package */void transitionZKNode(final Server server,
403       final RegionServerServices services, HRegion a, HRegion b)
404       throws IOException {
405     // Tell master about split by updating zk.  If we fail, abort.
406     if (server != null && server.getZooKeeper() != null) {
407       try {
408         this.znodeVersion = transitionNodeSplit(server.getZooKeeper(),
409           parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
410           server.getServerName(), this.znodeVersion);
411 
412         int spins = 0;
413         // Now wait for the master to process the split. We know it's done
414         // when the znode is deleted. The reason we keep tickling the znode is
415         // that it's possible for the master to miss an event.
416         do {
417           if (spins % 10 == 0) {
418             LOG.debug("Still waiting on the master to process the split for " +
419                 this.parent.getRegionInfo().getEncodedName());
420           }
421           Thread.sleep(100);
422           // When this returns -1 it means the znode doesn't exist
423           this.znodeVersion = tickleNodeSplit(server.getZooKeeper(),
424             parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
425             server.getServerName(), this.znodeVersion);
426           spins++;
427         } while (this.znodeVersion != -1 && !server.isStopped()
428             && !services.isStopping());
429       } catch (Exception e) {
430         if (e instanceof InterruptedException) {
431           Thread.currentThread().interrupt();
432         }
433         throw new IOException("Failed telling master about split", e);
434       }
435     }
436 
437     // Coprocessor callback
438     if (this.parent.getCoprocessorHost() != null) {
439       this.parent.getCoprocessorHost().postSplit(a,b);
440     }
441 
442     // Leaving here, the splitdir with its dross will be in place but since the
443     // split was successful, just leave it; it'll be cleaned when parent is
444     // deleted and cleaned up.
445   }
446 
447   /**
448    * Run the transaction.
449    * @param server Hosting server instance.  Can be null when testing (won't try
450    * and update in zk if a null server)
451    * @param services Used to online/offline regions.
452    * @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
453    * @return Regions created
454    * @throws IOException
455    * @see #rollback(Server, RegionServerServices)
456    */
457   public PairOfSameType<HRegion> execute(final Server server,
458       final RegionServerServices services)
459   throws IOException {
460     PairOfSameType<HRegion> regions = createDaughters(server, services);
461     openDaughters(server, services, regions.getFirst(), regions.getSecond());
462     transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
463     return regions;
464   }
465 
466   /*
467    * Open daughter region in its own thread.
468    * If we fail, abort this hosting server.
469    */
470   class DaughterOpener extends HasThread {
471     private final Server server;
472     private final HRegion r;
473     private Throwable t = null;
474 
475     DaughterOpener(final Server s, final HRegion r) {
476       super((s == null? "null-services": s.getServerName()) +
477         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
478       setDaemon(true);
479       this.server = s;
480       this.r = r;
481     }
482 
483     /**
484      * @return Null if open succeeded else exception that causes us fail open.
485      * Call it after this thread exits else you may get wrong view on result.
486      */
487     Throwable getException() {
488       return this.t;
489     }
490 
491     @Override
492     public void run() {
493       try {
494         openDaughterRegion(this.server, r);
495       } catch (Throwable t) {
496         this.t = t;
497       }
498     }
499   }
500 
501   /**
502    * Open daughter regions, add them to online list and update meta.
503    * @param server
504    * @param services Can be null when testing.
505    * @param daughter
506    * @throws IOException
507    * @throws KeeperException
508    */
509   void openDaughterRegion(final Server server, final HRegion daughter)
510   throws IOException, KeeperException {
511     HRegionInfo hri = daughter.getRegionInfo();
512     LoggingProgressable reporter = server == null? null:
513       new LoggingProgressable(hri, server.getConfiguration());
514     daughter.openHRegion(reporter);
515   }
516 
517   static class LoggingProgressable implements CancelableProgressable {
518     private final HRegionInfo hri;
519     private long lastLog = -1;
520     private final long interval;
521 
522     LoggingProgressable(final HRegionInfo hri, final Configuration c) {
523       this.hri = hri;
524       this.interval = c.getLong("hbase.regionserver.split.daughter.open.log.interval",
525         10000);
526     }
527 
528     @Override
529     public boolean progress() {
530       long now = System.currentTimeMillis();
531       if (now - lastLog > this.interval) {
532         LOG.info("Opening " + this.hri.getRegionNameAsString());
533         this.lastLog = now;
534       }
535       return true;
536     }
537   }
538 
539   private static Path getSplitDir(final HRegion r) {
540     return new Path(r.getRegionDir(), SPLITDIR);
541   }
542 
543   /**
544    * @param fs Filesystem to use
545    * @param splitdir Directory to store temporary split data in
546    * @throws IOException If <code>splitdir</code> already exists or we fail
547    * to create it.
548    * @see #cleanupSplitDir(FileSystem, Path)
549    */
550   void createSplitDir(final FileSystem fs, final Path splitdir)
551   throws IOException {
552     if (fs.exists(splitdir)) {
553       LOG.info("The " + splitdir
554           + " directory exists.  Hence deleting it to recreate it");
555       if (!HBaseFileSystem.deleteDirFromFileSystem(fs, splitdir)) {
556         throw new IOException("Failed deletion of " + splitdir
557             + " before creating them again.");
558       }
559     }
560     if (!HBaseFileSystem.makeDirOnFileSystem(fs, splitdir))
561         throw new IOException("Failed create of " + splitdir);
562   }
563 
564   private static void cleanupSplitDir(final FileSystem fs, final Path splitdir)
565   throws IOException {
566     // Splitdir may have been cleaned up by reopen of the parent dir.
567     deleteDir(fs, splitdir, false);
568   }
569 
570   /**
571    * @param fs Filesystem to use
572    * @param dir Directory to delete
573    * @param mustPreExist If true, we'll throw exception if <code>dir</code>
574    * does not preexist, else we'll just pass.
575    * @throws IOException Thrown if we fail to delete passed <code>dir</code>
576    */
577   private static void deleteDir(final FileSystem fs, final Path dir,
578       final boolean mustPreExist)
579   throws IOException {
580     if (!fs.exists(dir)) {
581       if (mustPreExist) throw new IOException(dir.toString() + " does not exist!");
582     } else if (!HBaseFileSystem.deleteDirFromFileSystem(fs, dir)) {
583       throw new IOException("Failed delete of " + dir);
584     }
585   }
586 
587   private void splitStoreFiles(final Path splitdir,
588     final List<StoreFile> hstoreFilesToSplit)
589   throws IOException {
590     if (hstoreFilesToSplit == null) {
591       // Could be null because close didn't succeed -- for now consider it fatal
592       throw new IOException("Close returned empty list of StoreFiles");
593     }
594     // The following code sets up a thread pool executor with as many slots as
595     // there's files to split. It then fires up everything, waits for
596     // completion and finally checks for any exception
597     int nbFiles = hstoreFilesToSplit.size();
598     if (nbFiles == 0) {
599       // no file needs to be splitted.
600       return;
601     }
602     LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent);
603     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
604     builder.setNameFormat("StoreFileSplitter-%1$d");
605     ThreadFactory factory = builder.build();
606     ThreadPoolExecutor threadPool =
607       (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
608     List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
609 
610      // Split each store file.
611     for (StoreFile sf: hstoreFilesToSplit) {
612       //splitStoreFile(sf, splitdir);
613       StoreFileSplitter sfs = new StoreFileSplitter(sf, splitdir);
614       futures.add(threadPool.submit(sfs));
615     }
616     // Shutdown the pool
617     threadPool.shutdown();
618 
619     // Wait for all the tasks to finish
620     try {
621       boolean stillRunning = !threadPool.awaitTermination(
622           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
623       if (stillRunning) {
624         threadPool.shutdownNow();
625         // wait for the thread to shutdown completely.
626         while (!threadPool.isTerminated()) {
627           Thread.sleep(50);
628         }
629         throw new IOException("Took too long to split the" +
630             " files and create the references, aborting split");
631       }
632     } catch (InterruptedException e) {
633       Thread.currentThread().interrupt();
634       throw new IOException("Interrupted while waiting for file splitters", e);
635     }
636 
637     // Look for any exception
638     for (Future<Void> future: futures) {
639       try {
640         future.get();
641       } catch (InterruptedException e) {
642         Thread.currentThread().interrupt();
643         throw new IOException(
644             "Interrupted while trying to get the results of file splitters", e);
645       } catch (ExecutionException e) {
646         throw new IOException(e);
647       }
648     }
649   }
650 
651   private void splitStoreFile(final StoreFile sf, final Path splitdir)
652   throws IOException {
653     FileSystem fs = this.parent.getFilesystem();
654     byte [] family = sf.getFamily();
655     String encoded = this.hri_a.getEncodedName();
656     Path storedir = Store.getStoreHomedir(splitdir, encoded, family);
657     StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);
658     encoded = this.hri_b.getEncodedName();
659     storedir = Store.getStoreHomedir(splitdir, encoded, family);
660     StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);
661   }
662 
663   /**
664    * Utility class used to do the file splitting / reference writing
665    * in parallel instead of sequentially.
666    */
667   class StoreFileSplitter implements Callable<Void> {
668 
669     private final StoreFile sf;
670     private final Path splitdir;
671 
672     /**
673      * Constructor that takes what it needs to split
674      * @param sf which file
675      * @param splitdir where the splitting is done
676      */
677     public StoreFileSplitter(final StoreFile sf, final Path splitdir) {
678       this.sf = sf;
679       this.splitdir = splitdir;
680     }
681 
682     public Void call() throws IOException {
683       splitStoreFile(sf, splitdir);
684       return null;
685     }
686   }
687 
688   /**
689    * @param hri Spec. for daughter region to open.
690    * @param flusher Flusher this region should use.
691    * @return Created daughter HRegion.
692    * @throws IOException
693    * @see #cleanupDaughterRegion(FileSystem, Path, HRegionInfo)
694    */
695   HRegion createDaughterRegion(final HRegionInfo hri,
696       final RegionServerServices rsServices)
697   throws IOException {
698     // Package private so unit tests have access.
699     FileSystem fs = this.parent.getFilesystem();
700     Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
701       this.splitdir, hri);
702     HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
703       this.parent.getLog(), fs, this.parent.getBaseConf(),
704       hri, this.parent.getTableDesc(), rsServices);
705     long halfParentReadRequestCount = this.parent.getReadRequestsCount() / 2;
706     r.readRequestsCount.set(halfParentReadRequestCount);
707     r.setOpMetricsReadRequestCount(halfParentReadRequestCount);
708     long halfParentWriteRequest = this.parent.getWriteRequestsCount() / 2;
709     r.writeRequestsCount.set(halfParentWriteRequest);
710     r.setOpMetricsWriteRequestCount(halfParentWriteRequest);    
711     HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
712     return r;
713   }
714 
715   private static void cleanupDaughterRegion(final FileSystem fs,
716     final Path tabledir, final String encodedName)
717   throws IOException {
718     Path regiondir = HRegion.getRegionDir(tabledir, encodedName);
719     // Dir may not preexist.
720     deleteDir(fs, regiondir, false);
721   }
722 
723   /*
724    * Get the daughter directories in the splits dir.  The splits dir is under
725    * the parent regions' directory.
726    * @param fs
727    * @param splitdir
728    * @param hri
729    * @return Path to daughter split dir.
730    * @throws IOException
731    */
732   private static Path getSplitDirForDaughter(final FileSystem fs,
733       final Path splitdir, final HRegionInfo hri)
734   throws IOException {
735     return new Path(splitdir, hri.getEncodedName());
736   }
737 
738   /**
739    * @param server Hosting server instance (May be null when testing).
740    * @param services
741    * @throws IOException If thrown, rollback failed.  Take drastic action.
742    * @return True if we successfully rolled back, false if we got to the point
743    * of no return and so now need to abort the server to minimize damage.
744    */
745   public boolean rollback(final Server server, final RegionServerServices services)
746   throws IOException {
747     boolean result = true;
748     FileSystem fs = this.parent.getFilesystem();
749     ListIterator<JournalEntry> iterator =
750       this.journal.listIterator(this.journal.size());
751     // Iterate in reverse.
752     while (iterator.hasPrevious()) {
753       JournalEntry je = iterator.previous();
754       switch(je) {
755       
756       case SET_SPLITTING_IN_ZK:
757         if (server != null && server.getZooKeeper() != null) {
758           cleanZK(server, this.parent.getRegionInfo());
759         }
760         break;
761 
762       case CREATE_SPLIT_DIR:
763     	this.parent.writestate.writesEnabled = true;
764         cleanupSplitDir(fs, this.splitdir);
765         break;
766 
767       case CLOSED_PARENT_REGION:
768         try {
769           // So, this returns a seqid but if we just closed and then reopened, we
770           // should be ok. On close, we flushed using sequenceid obtained from
771           // hosting regionserver so no need to propagate the sequenceid returned
772           // out of initialize below up into regionserver as we normally do.
773           // TODO: Verify.
774           this.parent.initialize();
775         } catch (IOException e) {
776           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
777             this.parent.getRegionNameAsString(), e);
778           throw new RuntimeException(e);
779         }
780         break;
781 
782       case STARTED_REGION_A_CREATION:
783         cleanupDaughterRegion(fs, this.parent.getTableDir(),
784           this.hri_a.getEncodedName());
785         break;
786 
787       case STARTED_REGION_B_CREATION:
788         cleanupDaughterRegion(fs, this.parent.getTableDir(),
789           this.hri_b.getEncodedName());
790         break;
791 
792       case OFFLINED_PARENT:
793         if (services != null) services.addToOnlineRegions(this.parent);
794         break;
795 
796       case PONR:
797         // We got to the point-of-no-return so we need to just abort. Return
798         // immediately.  Do not clean up created daughter regions.  They need
799         // to be in place so we don't delete the parent region mistakenly.
800         // See HBASE-3872.
801         return false;
802 
803       default:
804         throw new RuntimeException("Unhandled journal entry: " + je);
805       }
806     }
807     return result;
808   }
809 
810   HRegionInfo getFirstDaughter() {
811     return hri_a;
812   }
813 
814   HRegionInfo getSecondDaughter() {
815     return hri_b;
816   }
817 
818   // For unit testing.
819   Path getSplitDir() {
820     return this.splitdir;
821   }
822 
823   /**
824    * Clean up any split detritus that may have been left around from previous
825    * split attempts.
826    * Call this method on initial region deploy.  Cleans up any mess
827    * left by previous deploys of passed <code>r</code> region.
828    * @param r
829    * @throws IOException
830    */
831   static void cleanupAnySplitDetritus(final HRegion r) throws IOException {
832     Path splitdir = getSplitDir(r);
833     FileSystem fs = r.getFilesystem();
834     if (!fs.exists(splitdir)) return;
835     // Look at the splitdir.  It could have the encoded names of the daughter
836     // regions we tried to make.  See if the daughter regions actually got made
837     // out under the tabledir.  If here under splitdir still, then the split did
838     // not complete.  Try and do cleanup.  This code WILL NOT catch the case
839     // where we successfully created daughter a but regionserver crashed during
840     // the creation of region b.  In this case, there'll be an orphan daughter
841     // dir in the filesystem.  TOOD: Fix.
842     FileStatus [] daughters = fs.listStatus(splitdir, new FSUtils.DirFilter(fs));
843     for (int i = 0; i < daughters.length; i++) {
844       cleanupDaughterRegion(fs, r.getTableDir(),
845         daughters[i].getPath().getName());
846     }
847     cleanupSplitDir(r.getFilesystem(), splitdir);
848     LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
849   }
850 
851   private static void cleanZK(final Server server, final HRegionInfo hri) {
852     try {
853       // Only delete if its in expected state; could have been hijacked.
854       ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
855         EventType.RS_ZK_REGION_SPLITTING);
856     } catch (KeeperException e) {
857       server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
858     }
859   }
860 
861   /**
862    * Creates a new ephemeral node in the SPLITTING state for the specified region.
863    * Create it ephemeral in case regionserver dies mid-split.
864    *
865    * <p>Does not transition nodes from other states.  If a node already exists
866    * for this region, a {@link NodeExistsException} will be thrown.
867    *
868    * @param zkw zk reference
869    * @param region region to be created as offline
870    * @param serverName server event originates from
871    * @return Version of znode created.
872    * @throws KeeperException 
873    * @throws IOException 
874    */
875   void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
876       final ServerName serverName) throws KeeperException, IOException {
877     LOG.debug(zkw.prefix("Creating ephemeral node for " +
878       region.getEncodedName() + " in SPLITTING state"));
879     RegionTransitionData data =
880       new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING,
881         region.getRegionName(), serverName);
882 
883     String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
884     if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) {
885       throw new IOException("Failed create of ephemeral " + node);
886     }
887   }
888 
889   /**
890    * Transitions an existing node for the specified region which is
891    * currently in the SPLITTING state to be in the SPLIT state.  Converts the
892    * ephemeral SPLITTING znode to an ephemeral SPLIT node.  Master cleans up
893    * SPLIT znode when it reads it (or if we crash, zk will clean it up).
894    *
895    * <p>Does not transition nodes from other states.  If for some reason the
896    * node could not be transitioned, the method returns -1.  If the transition
897    * is successful, the version of the node after transition is returned.
898    *
899    * <p>This method can fail and return false for three different reasons:
900    * <ul><li>Node for this region does not exist</li>
901    * <li>Node for this region is not in SPLITTING state</li>
902    * <li>After verifying SPLITTING state, update fails because of wrong version
903    * (this should never actually happen since an RS only does this transition
904    * following a transition to SPLITTING.  if two RS are conflicting, one would
905    * fail the original transition to SPLITTING and not this transition)</li>
906    * </ul>
907    *
908    * <p>Does not set any watches.
909    *
910    * <p>This method should only be used by a RegionServer when completing the
911    * open of a region.
912    *
913    * @param zkw zk reference
914    * @param parent region to be transitioned to opened
915    * @param a Daughter a of split
916    * @param b Daughter b of split
917    * @param serverName server event originates from
918    * @return version of node after transition, -1 if unsuccessful transition
919    * @throws KeeperException if unexpected zookeeper exception
920    * @throws IOException 
921    */
922   private static int transitionNodeSplit(ZooKeeperWatcher zkw,
923       HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
924       final int znodeVersion)
925   throws KeeperException, IOException {
926     byte [] payload = Writables.getBytes(a, b);
927     return ZKAssign.transitionNode(zkw, parent, serverName,
928       EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLIT,
929       znodeVersion, payload);
930   }
931 
932   /**
933    * 
934    * @param zkw zk reference
935    * @param parent region to be transitioned to splitting
936    * @param serverName server event originates from
937    * @param version znode version
938    * @return version of node after transition, -1 if unsuccessful transition
939    * @throws KeeperException
940    * @throws IOException
941    */
942   int transitionNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo parent,
943       final ServerName serverName, final int version) throws KeeperException, IOException {
944     return ZKAssign.transitionNode(zkw, parent, serverName,
945       EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
946   }
947 
948   private static int tickleNodeSplit(ZooKeeperWatcher zkw,
949       HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
950       final int znodeVersion)
951   throws KeeperException, IOException {
952     byte [] payload = Writables.getBytes(a, b);
953     return ZKAssign.transitionNode(zkw, parent, serverName,
954       EventType.RS_ZK_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT,
955       znodeVersion, payload);
956   }
957 }