View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
22  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
23  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
24  
25  import java.io.IOException;
26  import java.io.InterruptedIOException;
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.ListIterator;
30  import java.util.Map;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.RegionTransition;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.catalog.CatalogTracker;
48  import org.apache.hadoop.hbase.catalog.MetaEditor;
49  import org.apache.hadoop.hbase.client.Mutation;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.executor.EventType;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.CancelableProgressable;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  import org.apache.hadoop.hbase.util.HasThread;
56  import org.apache.hadoop.hbase.util.PairOfSameType;
57  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
58  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
59  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
60  import org.apache.zookeeper.KeeperException;
61  import org.apache.zookeeper.KeeperException.NodeExistsException;
62  import org.apache.zookeeper.data.Stat;
63  
64  import com.google.common.util.concurrent.ThreadFactoryBuilder;
65  
66  /**
67   * Executes region split as a "transaction".  Call {@link #prepare()} to setup
68   * the transaction, {@link #execute(Server, RegionServerServices)} to run the
69   * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
70   *
71   * <p>Here is an example of how you would use this class:
72   * <pre>
73   *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
74   *  if (!st.prepare()) return;
75   *  try {
76   *    st.execute(server, services);
77   *  } catch (IOException ioe) {
78   *    try {
79   *      st.rollback(server, services);
80   *      return;
81   *    } catch (RuntimeException e) {
82   *      myAbortable.abort("Failed split, abort");
83   *    }
84   *  }
85   * </Pre>
86   * <p>This class is not thread safe.  Caller needs ensure split is run by
87   * one thread only.
88   */
89  @InterfaceAudience.Private
90  public class SplitTransaction {
91    private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
92  
93    /*
94     * Region to split
95     */
96    private final HRegion parent;
97    private HRegionInfo hri_a;
98    private HRegionInfo hri_b;
99    private long fileSplitTimeout = 30000;
100   private int znodeVersion = -1;
101 
102   /*
103    * Row to split around
104    */
105   private final byte [] splitrow;
106 
107   /**
108    * Types to add to the transaction journal.
109    * Each enum is a step in the split transaction. Used to figure how much
110    * we need to rollback.
111    */
112   enum JournalEntry {
113     /**
114      * Set region as in transition, set it into SPLITTING state.
115      */
116     SET_SPLITTING_IN_ZK,
117     /**
118      * We created the temporary split data directory.
119      */
120     CREATE_SPLIT_DIR,
121     /**
122      * Closed the parent region.
123      */
124     CLOSED_PARENT_REGION,
125     /**
126      * The parent has been taken out of the server's online regions list.
127      */
128     OFFLINED_PARENT,
129     /**
130      * Started in on creation of the first daughter region.
131      */
132     STARTED_REGION_A_CREATION,
133     /**
134      * Started in on the creation of the second daughter region.
135      */
136     STARTED_REGION_B_CREATION,
137     /**
138      * Point of no return.
139      * If we got here, then transaction is not recoverable other than by
140      * crashing out the regionserver.
141      */
142     PONR
143   }
144 
145   /*
146    * Journal of how far the split transaction has progressed.
147    */
148   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
149 
150   /**
151    * Constructor
152    * @param r Region to split
153    * @param splitrow Row to split around
154    */
155   public SplitTransaction(final HRegion r, final byte [] splitrow) {
156     this.parent = r;
157     this.splitrow = splitrow;
158   }
159 
160   /**
161    * Does checks on split inputs.
162    * @return <code>true</code> if the region is splittable else
163    * <code>false</code> if it is not (e.g. its already closed, etc.).
164    */
165   public boolean prepare() {
166     if (!this.parent.isSplittable()) return false;
167     // Split key can be null if this region is unsplittable; i.e. has refs.
168     if (this.splitrow == null) return false;
169     HRegionInfo hri = this.parent.getRegionInfo();
170     parent.prepareToSplit();
171     // Check splitrow.
172     byte [] startKey = hri.getStartKey();
173     byte [] endKey = hri.getEndKey();
174     if (Bytes.equals(startKey, splitrow) ||
175         !this.parent.getRegionInfo().containsRow(splitrow)) {
176       LOG.info("Split row is not inside region key range or is equal to " +
177           "startkey: " + Bytes.toStringBinary(this.splitrow));
178       return false;
179     }
180     long rid = getDaughterRegionIdTimestamp(hri);
181     this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
182     this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
183     return true;
184   }
185 
186   /**
187    * Calculate daughter regionid to use.
188    * @param hri Parent {@link HRegionInfo}
189    * @return Daughter region id (timestamp) to use.
190    */
191   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
192     long rid = EnvironmentEdgeManager.currentTimeMillis();
193     // Regionid is timestamp.  Can't be less than that of parent else will insert
194     // at wrong location in hbase:meta (See HBASE-710).
195     if (rid < hri.getRegionId()) {
196       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
197         " but current time here is " + rid);
198       rid = hri.getRegionId() + 1;
199     }
200     return rid;
201   }
202 
203   private static IOException closedByOtherException = new IOException(
204       "Failed to close region: already closed by another thread");
205 
206   /**
207    * Prepare the regions and region files.
208    * @param server Hosting server instance.  Can be null when testing (won't try
209    * and update in zk if a null server)
210    * @param services Used to online/offline regions.
211    * @throws IOException If thrown, transaction failed.
212    *    Call {@link #rollback(Server, RegionServerServices)}
213    * @return Regions created
214    */
215   /* package */PairOfSameType<HRegion> createDaughters(final Server server,
216       final RegionServerServices services) throws IOException {
217     LOG.info("Starting split of region " + this.parent);
218     if ((server != null && server.isStopped()) ||
219         (services != null && services.isStopping())) {
220       throw new IOException("Server is stopped or stopping");
221     }
222     assert !this.parent.lock.writeLock().isHeldByCurrentThread():
223       "Unsafe to hold write lock while performing RPCs";
224 
225     // Coprocessor callback
226     if (this.parent.getCoprocessorHost() != null) {
227       this.parent.getCoprocessorHost().preSplit();
228     }
229 
230     // Coprocessor callback
231     if (this.parent.getCoprocessorHost() != null) {
232       this.parent.getCoprocessorHost().preSplit(this.splitrow);
233     }
234 
235     // If true, no cluster to write meta edits to or to update znodes in.
236     boolean testing = server == null? true:
237         server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
238     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
239         server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
240           this.fileSplitTimeout);
241 
242     PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
243 
244     List<Mutation> metaEntries = new ArrayList<Mutation>();
245     if (this.parent.getCoprocessorHost() != null) {
246       if (this.parent.getCoprocessorHost().
247           preSplitBeforePONR(this.splitrow, metaEntries)) {
248         throw new IOException("Coprocessor bypassing region "
249             + this.parent.getRegionNameAsString() + " split.");
250       }
251       try {
252         for (Mutation p : metaEntries) {
253           HRegionInfo.parseRegionName(p.getRow());
254         }
255       } catch (IOException e) {
256         LOG.error("Row key of mutation from coprossor is not parsable as region name."
257             + "Mutations from coprocessor should only for hbase:meta table.");
258         throw e;
259       }
260     }
261 
262     // This is the point of no return.  Adding subsequent edits to .META. as we
263     // do below when we do the daughter opens adding each to .META. can fail in
264     // various interesting ways the most interesting of which is a timeout
265     // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
266     // then subsequent failures need to crash out this regionserver; the
267     // server shutdown processing should be able to fix-up the incomplete split.
268     // The offlined parent will have the daughters as extra columns.  If
269     // we leave the daughter regions in place and do not remove them when we
270     // crash out, then they will have their references to the parent in place
271     // still and the server shutdown fixup of .META. will point to these
272     // regions.
273     // We should add PONR JournalEntry before offlineParentInMeta,so even if
274     // OfflineParentInMeta timeout,this will cause regionserver exit,and then
275     // master ServerShutdownHandler will fix daughter & avoid data loss. (See
276     // HBase-4562).
277     this.journal.add(JournalEntry.PONR);
278 
279     // Edit parent in meta.  Offlines parent region and adds splita and splitb
280     // as an atomic update. See HBASE-7721. This update to META makes the region
281     // will determine whether the region is split or not in case of failures.
282     // If it is successful, master will roll-forward, if not, master will rollback
283     // and assign the parent region.
284     if (!testing) {
285       if (metaEntries == null || metaEntries.isEmpty()) {
286         MetaEditor.splitRegion(server.getCatalogTracker(),
287             parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
288             daughterRegions.getSecond().getRegionInfo(), server.getServerName());
289       } else {
290         offlineParentInMetaAndputMetaEntries(server.getCatalogTracker(),
291           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
292               .getSecond().getRegionInfo(), server.getServerName(), metaEntries);
293       }
294     }
295     return daughterRegions;
296   }
297 
298   public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
299       final RegionServerServices services, boolean testing) throws IOException {
300     // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
301     // have zookeeper so don't do zk stuff if server or zookeeper is null
302     if (server != null && server.getZooKeeper() != null) {
303       try {
304         createNodeSplitting(server.getZooKeeper(),
305           parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
306       } catch (KeeperException e) {
307         throw new IOException("Failed creating PENDING_SPLIT znode on " +
308           this.parent.getRegionNameAsString(), e);
309       }
310     }
311     this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
312     if (server != null && server.getZooKeeper() != null) {
313       // After creating the split node, wait for master to transition it
314       // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
315       // knows about it and won't transition any region which is splitting.
316       znodeVersion = getZKNode(server, services);
317     }
318 
319     this.parent.getRegionFileSystem().createSplitsDir();
320     this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
321 
322     Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
323     Exception exceptionToThrow = null;
324     try{
325       hstoreFilesToSplit = this.parent.close(false);
326     } catch (Exception e) {
327       exceptionToThrow = e;
328     }
329     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
330       // The region was closed by a concurrent thread.  We can't continue
331       // with the split, instead we must just abandon the split.  If we
332       // reopen or split this could cause problems because the region has
333       // probably already been moved to a different server, or is in the
334       // process of moving to a different server.
335       exceptionToThrow = closedByOtherException;
336     }
337     if (exceptionToThrow != closedByOtherException) {
338       this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
339     }
340     if (exceptionToThrow != null) {
341       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
342       throw new IOException(exceptionToThrow);
343     }
344     if (!testing) {
345       services.removeFromOnlineRegions(this.parent, null);
346     }
347     this.journal.add(JournalEntry.OFFLINED_PARENT);
348 
349     // TODO: If splitStoreFiles were multithreaded would we complete steps in
350     // less elapsed time?  St.Ack 20100920
351     //
352     // splitStoreFiles creates daughter region dirs under the parent splits dir
353     // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
354     // clean this up.
355     splitStoreFiles(hstoreFilesToSplit);
356 
357     // Log to the journal that we are creating region A, the first daughter
358     // region.  We could fail halfway through.  If we do, we could have left
359     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
360     // add entry to journal BEFORE rather than AFTER the change.
361     this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
362     HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
363 
364     // Ditto
365     this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
366     HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
367     return new PairOfSameType<HRegion>(a, b);
368   }
369 
370   /**
371    * Perform time consuming opening of the daughter regions.
372    * @param server Hosting server instance.  Can be null when testing (won't try
373    * and update in zk if a null server)
374    * @param services Used to online/offline regions.
375    * @param a first daughter region
376    * @param a second daughter region
377    * @throws IOException If thrown, transaction failed.
378    *          Call {@link #rollback(Server, RegionServerServices)}
379    */
380   /* package */void openDaughters(final Server server,
381       final RegionServerServices services, HRegion a, HRegion b)
382       throws IOException {
383     boolean stopped = server != null && server.isStopped();
384     boolean stopping = services != null && services.isStopping();
385     // TODO: Is this check needed here?
386     if (stopped || stopping) {
387       LOG.info("Not opening daughters " +
388           b.getRegionInfo().getRegionNameAsString() +
389           " and " +
390           a.getRegionInfo().getRegionNameAsString() +
391           " because stopping=" + stopping + ", stopped=" + stopped);
392     } else {
393       // Open daughters in parallel.
394       DaughterOpener aOpener = new DaughterOpener(server, a);
395       DaughterOpener bOpener = new DaughterOpener(server, b);
396       aOpener.start();
397       bOpener.start();
398       try {
399         aOpener.join();
400         bOpener.join();
401       } catch (InterruptedException e) {
402         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
403       }
404       if (aOpener.getException() != null) {
405         throw new IOException("Failed " +
406           aOpener.getName(), aOpener.getException());
407       }
408       if (bOpener.getException() != null) {
409         throw new IOException("Failed " +
410           bOpener.getName(), bOpener.getException());
411       }
412       if (services != null) {
413         try {
414           // add 2nd daughter first (see HBASE-4335)
415           services.postOpenDeployTasks(b, server.getCatalogTracker());
416           // Should add it to OnlineRegions
417           services.addToOnlineRegions(b);
418           services.postOpenDeployTasks(a, server.getCatalogTracker());
419           services.addToOnlineRegions(a);
420         } catch (KeeperException ke) {
421           throw new IOException(ke);
422         }
423       }
424     }
425   }
426 
427   /**
428    * Finish off split transaction, transition the zknode
429    * @param server Hosting server instance.  Can be null when testing (won't try
430    * and update in zk if a null server)
431    * @param services Used to online/offline regions.
432    * @param a first daughter region
433    * @param a second daughter region
434    * @throws IOException If thrown, transaction failed.
435    *          Call {@link #rollback(Server, RegionServerServices)}
436    */
437   /* package */void transitionZKNode(final Server server,
438       final RegionServerServices services, HRegion a, HRegion b)
439       throws IOException {
440     // Tell master about split by updating zk.  If we fail, abort.
441     if (server != null && server.getZooKeeper() != null) {
442       try {
443         this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
444           parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
445           server.getServerName(), this.znodeVersion,
446           RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT);
447 
448         int spins = 0;
449         // Now wait for the master to process the split. We know it's done
450         // when the znode is deleted. The reason we keep tickling the znode is
451         // that it's possible for the master to miss an event.
452         do {
453           if (spins % 10 == 0) {
454             LOG.debug("Still waiting on the master to process the split for " +
455                 this.parent.getRegionInfo().getEncodedName());
456           }
457           Thread.sleep(100);
458           // When this returns -1 it means the znode doesn't exist
459           this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
460             parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
461             server.getServerName(), this.znodeVersion,
462             RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT);
463           spins++;
464         } while (this.znodeVersion != -1 && !server.isStopped()
465             && !services.isStopping());
466       } catch (Exception e) {
467         if (e instanceof InterruptedException) {
468           Thread.currentThread().interrupt();
469         }
470         throw new IOException("Failed telling master about split", e);
471       }
472     }
473 
474     // Coprocessor callback
475     if (this.parent.getCoprocessorHost() != null) {
476       this.parent.getCoprocessorHost().postSplit(a,b);
477     }
478 
479     // Leaving here, the splitdir with its dross will be in place but since the
480     // split was successful, just leave it; it'll be cleaned when parent is
481     // deleted and cleaned up.
482   }
483 
484   /**
485    * Wait for the splitting node to be transitioned from pending_split
486    * to splitting by master. That's how we are sure master has processed
487    * the event and is good with us to move on. If we don't get any update,
488    * we periodically transition the node so that master gets the callback.
489    * If the node is removed or is not in pending_split state any more,
490    * we abort the split.
491    */
492   private int getZKNode(final Server server,
493       final RegionServerServices services) throws IOException {
494     // Wait for the master to process the pending_split.
495     try {
496       int spins = 0;
497       Stat stat = new Stat();
498       ZooKeeperWatcher zkw = server.getZooKeeper();
499       ServerName expectedServer = server.getServerName();
500       String node = parent.getRegionInfo().getEncodedName();
501       while (!(server.isStopped() || services.isStopping())) {
502         if (spins % 5 == 0) {
503           LOG.debug("Still waiting for master to process "
504             + "the pending_split for " + node);
505           transitionSplittingNode(zkw, parent.getRegionInfo(),
506             hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT,
507             RS_ZK_REQUEST_REGION_SPLIT);
508         }
509         Thread.sleep(100);
510         spins++;
511         byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
512         if (data == null) {
513           throw new IOException("Data is null, splitting node "
514             + node + " no longer exists");
515         }
516         RegionTransition rt = RegionTransition.parseFrom(data);
517         EventType et = rt.getEventType();
518         if (et == RS_ZK_REGION_SPLITTING) {
519           ServerName serverName = rt.getServerName();
520           if (!serverName.equals(expectedServer)) {
521             throw new IOException("Splitting node " + node + " is for "
522               + serverName + ", not us " + expectedServer);
523           }
524           byte [] payloadOfSplitting = rt.getPayload();
525           List<HRegionInfo> splittingRegions = HRegionInfo.parseDelimitedFrom(
526             payloadOfSplitting, 0, payloadOfSplitting.length);
527           assert splittingRegions.size() == 2;
528           HRegionInfo a = splittingRegions.get(0);
529           HRegionInfo b = splittingRegions.get(1);
530           if (!(hri_a.equals(a) && hri_b.equals(b))) {
531             throw new IOException("Splitting node " + node + " is for " + a + ", "
532               + b + ", not expected daughters: " + hri_a + ", " + hri_b);
533           }
534           // Master has processed it.
535           return stat.getVersion();
536         }
537         if (et != RS_ZK_REQUEST_REGION_SPLIT) {
538           throw new IOException("Splitting node " + node
539             + " moved out of splitting to " + et);
540         }
541       }
542       // Server is stopping/stopped
543       throw new IOException("Server is "
544         + (services.isStopping() ? "stopping" : "stopped"));
545     } catch (Exception e) {
546       if (e instanceof InterruptedException) {
547         Thread.currentThread().interrupt();
548       }
549       throw new IOException("Failed getting SPLITTING znode on "
550         + parent.getRegionNameAsString(), e);
551     }
552   }
553 
554   /**
555    * Run the transaction.
556    * @param server Hosting server instance.  Can be null when testing (won't try
557    * and update in zk if a null server)
558    * @param services Used to online/offline regions.
559    * @throws IOException If thrown, transaction failed.
560    *          Call {@link #rollback(Server, RegionServerServices)}
561    * @return Regions created
562    * @throws IOException
563    * @see #rollback(Server, RegionServerServices)
564    */
565   public PairOfSameType<HRegion> execute(final Server server,
566       final RegionServerServices services)
567   throws IOException {
568     PairOfSameType<HRegion> regions = createDaughters(server, services);
569     if (this.parent.getCoprocessorHost() != null) {
570       this.parent.getCoprocessorHost().preSplitAfterPONR();
571     }
572     return stepsAfterPONR(server, services, regions);
573   }
574 
575   public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
576       final RegionServerServices services, PairOfSameType<HRegion> regions)
577       throws IOException {
578     openDaughters(server, services, regions.getFirst(), regions.getSecond());
579     transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
580     return regions;
581   }
582 
583   private void offlineParentInMetaAndputMetaEntries(CatalogTracker catalogTracker,
584       HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
585       ServerName serverName, List<Mutation> metaEntries) throws IOException {
586     List<Mutation> mutations = metaEntries;
587     HRegionInfo copyOfParent = new HRegionInfo(parent);
588     copyOfParent.setOffline(true);
589     copyOfParent.setSplit(true);
590 
591     //Put for parent
592     Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent);
593     MetaEditor.addDaughtersToPut(putParent, splitA, splitB);
594     mutations.add(putParent);
595     
596     //Puts for daughters
597     Put putA = MetaEditor.makePutFromRegionInfo(splitA);
598     Put putB = MetaEditor.makePutFromRegionInfo(splitB);
599 
600     addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
601     addLocation(putB, serverName, 1);
602     mutations.add(putA);
603     mutations.add(putB);
604     MetaEditor.mutateMetaTable(catalogTracker, mutations);
605   }
606 
607   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
608     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
609       Bytes.toBytes(sn.getHostAndPort()));
610     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
611       Bytes.toBytes(sn.getStartcode()));
612     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
613         Bytes.toBytes(openSeqNum));
614     return p;
615   }
616 
617   /*
618    * Open daughter region in its own thread.
619    * If we fail, abort this hosting server.
620    */
621   class DaughterOpener extends HasThread {
622     private final Server server;
623     private final HRegion r;
624     private Throwable t = null;
625 
626     DaughterOpener(final Server s, final HRegion r) {
627       super((s == null? "null-services": s.getServerName()) +
628         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
629       setDaemon(true);
630       this.server = s;
631       this.r = r;
632     }
633 
634     /**
635      * @return Null if open succeeded else exception that causes us fail open.
636      * Call it after this thread exits else you may get wrong view on result.
637      */
638     Throwable getException() {
639       return this.t;
640     }
641 
642     @Override
643     public void run() {
644       try {
645         openDaughterRegion(this.server, r);
646       } catch (Throwable t) {
647         this.t = t;
648       }
649     }
650   }
651 
652   /**
653    * Open daughter regions, add them to online list and update meta.
654    * @param server
655    * @param daughter
656    * @throws IOException
657    * @throws KeeperException
658    */
659   void openDaughterRegion(final Server server, final HRegion daughter)
660   throws IOException, KeeperException {
661     HRegionInfo hri = daughter.getRegionInfo();
662     LoggingProgressable reporter = server == null ? null
663         : new LoggingProgressable(hri, server.getConfiguration().getLong(
664             "hbase.regionserver.split.daughter.open.log.interval", 10000));
665     daughter.openHRegion(reporter);
666   }
667 
668   static class LoggingProgressable implements CancelableProgressable {
669     private final HRegionInfo hri;
670     private long lastLog = -1;
671     private final long interval;
672 
673     LoggingProgressable(final HRegionInfo hri, final long interval) {
674       this.hri = hri;
675       this.interval = interval;
676     }
677 
678     @Override
679     public boolean progress() {
680       long now = System.currentTimeMillis();
681       if (now - lastLog > this.interval) {
682         LOG.info("Opening " + this.hri.getRegionNameAsString());
683         this.lastLog = now;
684       }
685       return true;
686     }
687   }
688 
689   private void splitStoreFiles(final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
690       throws IOException {
691     if (hstoreFilesToSplit == null) {
692       // Could be null because close didn't succeed -- for now consider it fatal
693       throw new IOException("Close returned empty list of StoreFiles");
694     }
695     // The following code sets up a thread pool executor with as many slots as
696     // there's files to split. It then fires up everything, waits for
697     // completion and finally checks for any exception
698     int nbFiles = hstoreFilesToSplit.size();
699     if (nbFiles == 0) {
700       // no file needs to be splitted.
701       return;
702     }
703     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
704     builder.setNameFormat("StoreFileSplitter-%1$d");
705     ThreadFactory factory = builder.build();
706     ThreadPoolExecutor threadPool =
707       (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
708     List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
709 
710     // Split each store file.
711     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
712       for (StoreFile sf: entry.getValue()) {
713         StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
714         futures.add(threadPool.submit(sfs));
715       }
716     }
717     // Shutdown the pool
718     threadPool.shutdown();
719 
720     // Wait for all the tasks to finish
721     try {
722       boolean stillRunning = !threadPool.awaitTermination(
723           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
724       if (stillRunning) {
725         threadPool.shutdownNow();
726         // wait for the thread to shutdown completely.
727         while (!threadPool.isTerminated()) {
728           Thread.sleep(50);
729         }
730         throw new IOException("Took too long to split the" +
731             " files and create the references, aborting split");
732       }
733     } catch (InterruptedException e) {
734       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
735     }
736 
737     // Look for any exception
738     for (Future<Void> future: futures) {
739       try {
740         future.get();
741       } catch (InterruptedException e) {
742         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
743       } catch (ExecutionException e) {
744         throw new IOException(e);
745       }
746     }
747   }
748 
749   private void splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
750     HRegionFileSystem fs = this.parent.getRegionFileSystem();
751     String familyName = Bytes.toString(family);
752     fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
753     fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true);
754   }
755 
756   /**
757    * Utility class used to do the file splitting / reference writing
758    * in parallel instead of sequentially.
759    */
760   class StoreFileSplitter implements Callable<Void> {
761     private final byte[] family;
762     private final StoreFile sf;
763 
764     /**
765      * Constructor that takes what it needs to split
766      * @param family Family that contains the store file
767      * @param sf which file
768      */
769     public StoreFileSplitter(final byte[] family, final StoreFile sf) {
770       this.sf = sf;
771       this.family = family;
772     }
773 
774     public Void call() throws IOException {
775       splitStoreFile(family, sf);
776       return null;
777     }
778   }
779 
780   /**
781    * @param server Hosting server instance (May be null when testing).
782    * @param services
783    * @throws IOException If thrown, rollback failed.  Take drastic action.
784    * @return True if we successfully rolled back, false if we got to the point
785    * of no return and so now need to abort the server to minimize damage.
786    */
787   @SuppressWarnings("deprecation")
788   public boolean rollback(final Server server, final RegionServerServices services)
789   throws IOException {
790     // Coprocessor callback
791     if (this.parent.getCoprocessorHost() != null) {
792       this.parent.getCoprocessorHost().preRollBackSplit();
793     }
794 
795     boolean result = true;
796     ListIterator<JournalEntry> iterator =
797       this.journal.listIterator(this.journal.size());
798     // Iterate in reverse.
799     while (iterator.hasPrevious()) {
800       JournalEntry je = iterator.previous();
801       switch(je) {
802 
803       case SET_SPLITTING_IN_ZK:
804         if (server != null && server.getZooKeeper() != null) {
805           cleanZK(server, this.parent.getRegionInfo());
806         }
807         break;
808 
809       case CREATE_SPLIT_DIR:
810         this.parent.writestate.writesEnabled = true;
811         this.parent.getRegionFileSystem().cleanupSplitsDir();
812         break;
813 
814       case CLOSED_PARENT_REGION:
815         try {
816           // So, this returns a seqid but if we just closed and then reopened, we
817           // should be ok. On close, we flushed using sequenceid obtained from
818           // hosting regionserver so no need to propagate the sequenceid returned
819           // out of initialize below up into regionserver as we normally do.
820           // TODO: Verify.
821           this.parent.initialize();
822         } catch (IOException e) {
823           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
824             this.parent.getRegionNameAsString(), e);
825           throw new RuntimeException(e);
826         }
827         break;
828 
829       case STARTED_REGION_A_CREATION:
830         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
831         break;
832 
833       case STARTED_REGION_B_CREATION:
834         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
835         break;
836 
837       case OFFLINED_PARENT:
838         if (services != null) services.addToOnlineRegions(this.parent);
839         break;
840 
841       case PONR:
842         // We got to the point-of-no-return so we need to just abort. Return
843         // immediately.  Do not clean up created daughter regions.  They need
844         // to be in place so we don't delete the parent region mistakenly.
845         // See HBASE-3872.
846         return false;
847 
848       default:
849         throw new RuntimeException("Unhandled journal entry: " + je);
850       }
851     }
852     // Coprocessor callback
853     if (this.parent.getCoprocessorHost() != null) {
854       this.parent.getCoprocessorHost().postRollBackSplit();
855     }
856     return result;
857   }
858 
859   HRegionInfo getFirstDaughter() {
860     return hri_a;
861   }
862 
863   HRegionInfo getSecondDaughter() {
864     return hri_b;
865   }
866 
867   private static void cleanZK(final Server server, final HRegionInfo hri) {
868     try {
869       // Only delete if its in expected state; could have been hijacked.
870       if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
871           RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) {
872         ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
873           RS_ZK_REGION_SPLITTING, server.getServerName());
874       }
875     } catch (KeeperException.NoNodeException e) {
876       LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
877     } catch (KeeperException e) {
878       server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
879     }
880   }
881 
882   /**
883    * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
884    * Create it ephemeral in case regionserver dies mid-split.
885    *
886    * <p>Does not transition nodes from other states.  If a node already exists
887    * for this region, a {@link NodeExistsException} will be thrown.
888    *
889    * @param zkw zk reference
890    * @param region region to be created as offline
891    * @param serverName server event originates from
892    * @throws KeeperException
893    * @throws IOException
894    */
895   public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
896       final ServerName serverName, final HRegionInfo a,
897       final HRegionInfo b) throws KeeperException, IOException {
898     LOG.debug(zkw.prefix("Creating ephemeral node for " +
899       region.getEncodedName() + " in PENDING_SPLIT state"));
900     byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
901     RegionTransition rt = RegionTransition.createRegionTransition(
902       RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
903     String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
904     if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
905       throw new IOException("Failed create of ephemeral " + node);
906     }
907   }
908 
909   /**
910    * Transitions an existing ephemeral node for the specified region which is
911    * currently in the begin state to be in the end state. Master cleans up the
912    * final SPLIT znode when it reads it (or if we crash, zk will clean it up).
913    *
914    * <p>Does not transition nodes from other states. If for some reason the
915    * node could not be transitioned, the method returns -1. If the transition
916    * is successful, the version of the node after transition is returned.
917    *
918    * <p>This method can fail and return false for three different reasons:
919    * <ul><li>Node for this region does not exist</li>
920    * <li>Node for this region is not in the begin state</li>
921    * <li>After verifying the begin state, update fails because of wrong version
922    * (this should never actually happen since an RS only does this transition
923    * following a transition to the begin state. If two RS are conflicting, one would
924    * fail the original transition to the begin state and not this transition)</li>
925    * </ul>
926    *
927    * <p>Does not set any watches.
928    *
929    * <p>This method should only be used by a RegionServer when splitting a region.
930    *
931    * @param zkw zk reference
932    * @param parent region to be transitioned to opened
933    * @param a Daughter a of split
934    * @param b Daughter b of split
935    * @param serverName server event originates from
936    * @param znodeVersion expected version of data before modification
937    * @param beginState the expected current state the znode should be
938    * @param endState the state to be transition to
939    * @return version of node after transition, -1 if unsuccessful transition
940    * @throws KeeperException if unexpected zookeeper exception
941    * @throws IOException
942    */
943   public static int transitionSplittingNode(ZooKeeperWatcher zkw,
944       HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
945       final int znodeVersion, final EventType beginState,
946       final EventType endState) throws KeeperException, IOException {
947     byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
948     return ZKAssign.transitionNode(zkw, parent, serverName,
949       beginState, endState, znodeVersion, payload);
950   }
951 }