View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitationsME
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.ListIterator;
27  import java.util.Map;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.MetaMutationAnnotation;
36  import org.apache.hadoop.hbase.Server;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.MetaTableAccessor;
39  import org.apache.hadoop.hbase.client.Delete;
40  import org.apache.hadoop.hbase.client.HConnection;
41  import org.apache.hadoop.hbase.client.Mutation;
42  import org.apache.hadoop.hbase.client.Put;
43  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
44  import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails;
45  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
46  import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable;
47  import org.apache.hadoop.hbase.security.User;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.ConfigUtil;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  import org.apache.hadoop.hbase.util.Pair;
52  import org.apache.zookeeper.KeeperException;
53  
54  @InterfaceAudience.Private
55  public class RegionMergeTransactionImpl implements RegionMergeTransaction {
56    private static final Log LOG = LogFactory.getLog(RegionMergeTransactionImpl.class);
57  
58    // Merged region info
59    private HRegionInfo mergedRegionInfo;
60    // region_a sorts before region_b
61    private final HRegion region_a;
62    private final HRegion region_b;
63    // merges dir is under region_a
64    private final Path mergesdir;
65    // We only merge adjacent regions if forcible is false
66    private final boolean forcible;
67    private boolean useCoordinationForAssignment;
68    private final long masterSystemTime;
69  
70    /*
71     * Transaction state for listener, only valid during execute and
72     * rollback
73     */
74    private RegionMergeTransactionPhase currentPhase = RegionMergeTransactionPhase.STARTED;
75    private Server server;
76    private RegionServerServices rsServices;
77  
78    public static class JournalEntryImpl implements JournalEntry {
79      private RegionMergeTransactionPhase type;
80      private long timestamp;
81  
82      public JournalEntryImpl(RegionMergeTransactionPhase type) {
83        this(type, EnvironmentEdgeManager.currentTime());
84      }
85  
86      public JournalEntryImpl(RegionMergeTransactionPhase type, long timestamp) {
87        this.type = type;
88        this.timestamp = timestamp;
89      }
90  
91      @Override
92      public String toString() {
93        StringBuilder sb = new StringBuilder();
94        sb.append(type);
95        sb.append(" at ");
96        sb.append(timestamp);
97        return sb.toString();
98      }
99  
100     @Override
101     public RegionMergeTransactionPhase getPhase() {
102       return type;
103     }
104 
105     @Override
106     public long getTimeStamp() {
107       return timestamp;
108     }
109   }
110 
111   /*
112    * Journal of how far the merge transaction has progressed.
113    */
114   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
115 
116   /*
117    * Listeners
118    */
119   private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
120 
121   private static IOException closedByOtherException = new IOException(
122       "Failed to close region: already closed by another thread");
123 
124   private RegionServerCoprocessorHost rsCoprocessorHost = null;
125 
126   private RegionMergeDetails rmd;
127 
128   /**
129    * Constructor
130    * @param a region a to merge
131    * @param b region b to merge
132    * @param forcible if false, we will only merge adjacent regions
133    */
134   public RegionMergeTransactionImpl(final Region a, final Region b,
135       final boolean forcible) {
136     this(a, b, forcible, EnvironmentEdgeManager.currentTime());
137   }
138   /**
139    * Constructor
140    * @param a region a to merge
141    * @param b region b to merge
142    * @param forcible if false, we will only merge adjacent regions
143    * @param masterSystemTime the time at the master side
144    */
145   public RegionMergeTransactionImpl(final Region a, final Region b,
146       final boolean forcible, long masterSystemTime) {
147     if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
148       this.region_a = (HRegion)a;
149       this.region_b = (HRegion)b;
150     } else {
151       this.region_a = (HRegion)b;
152       this.region_b = (HRegion)a;
153     }
154     this.forcible = forcible;
155     this.masterSystemTime = masterSystemTime;
156     this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
157   }
158 
159   private void transition(RegionMergeTransactionPhase nextPhase) throws IOException {
160     transition(nextPhase, false);
161   }
162 
163   private void transition(RegionMergeTransactionPhase nextPhase, boolean isRollback)
164       throws IOException {
165     if (!isRollback) {
166       // Add to the journal first, because if the listener throws an exception
167       // we need to roll back starting at 'nextPhase'
168       this.journal.add(new JournalEntryImpl(nextPhase));
169     }
170     for (int i = 0; i < listeners.size(); i++) {
171       TransactionListener listener = listeners.get(i);
172       if (!isRollback) {
173         listener.transition(this, currentPhase, nextPhase);
174       } else {
175         listener.rollback(this, currentPhase, nextPhase);
176       }
177     }
178     currentPhase = nextPhase;
179   }
180 
181   /**
182    * Does checks on merge inputs.
183    * @param services
184    * @return <code>true</code> if the regions are mergeable else
185    *         <code>false</code> if they are not (e.g. its already closed, etc.).
186    */
187   @Override
188   public boolean prepare(final RegionServerServices services) throws IOException {
189     if (!region_a.getTableDesc().getTableName()
190         .equals(region_b.getTableDesc().getTableName())) {
191       LOG.info("Can't merge regions " + region_a + "," + region_b
192           + " because they do not belong to the same table");
193       return false;
194     }
195     if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
196       LOG.info("Can't merge the same region " + region_a);
197       return false;
198     }
199     if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
200             region_b.getRegionInfo())) {
201       String msg = "Skip merging " + this.region_a.getRegionInfo().getRegionNameAsString()
202           + " and " + this.region_b.getRegionInfo().getRegionNameAsString()
203           + ", because they are not adjacent.";
204       LOG.info(msg);
205       return false;
206     }
207     if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
208       return false;
209     }
210     try {
211       boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
212           region_a.getRegionInfo().getRegionName());
213       if (regionAHasMergeQualifier ||
214           hasMergeQualifierInMeta(services, region_b.getRegionInfo().getRegionName())) {
215         LOG.debug("Region " + (regionAHasMergeQualifier ?
216               region_a.getRegionInfo().getRegionNameAsString() :
217                 region_b.getRegionInfo().getRegionNameAsString())
218             + " is not mergeable because it has merge qualifier in META");
219         return false;
220       }
221     } catch (IOException e) {
222       LOG.warn("Failed judging whether merge transaction is available for "
223               + region_a.getRegionInfo().getRegionNameAsString() + " and "
224               + region_b.getRegionInfo().getRegionNameAsString(), e);
225       return false;
226     }
227 
228     // WARN: make sure there is no parent region of the two merging regions in
229     // hbase:meta If exists, fixing up daughters would cause daughter regions(we
230     // have merged one) online again when we restart master, so we should clear
231     // the parent region to prevent the above case
232     // Since HBASE-7721, we don't need fix up daughters any more. so here do
233     // nothing
234 
235     this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
236         region_b.getRegionInfo());
237 
238     transition(RegionMergeTransactionPhase.PREPARED);
239     return true;
240   }
241 
242   /**
243    * Run the transaction.
244    * @param server Hosting server instance. Can be null when testing
245    * @param services Used to online/offline regions.
246    * @throws IOException If thrown, transaction failed. Call
247    *           {@link #rollback(Server, RegionServerServices)}
248    * @return merged region
249    * @throws IOException
250    * @see #rollback(Server, RegionServerServices)
251    */
252   @Override
253   public HRegion execute(final Server server,
254       final RegionServerServices services) throws IOException {
255     if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
256       LOG.warn("Should use execute(Server, RegionServerServices, User)");
257     }
258     return execute(server, services, null);
259   }
260 
261   @Override
262   public HRegion execute(final Server server, final RegionServerServices services, User user)
263       throws IOException {
264     this.server = server;
265     this.rsServices = services;
266     useCoordinationForAssignment =
267         server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
268     if (rmd == null) {
269       rmd = server != null && server.getCoordinatedStateManager() != null ?
270           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
271             .getRegionMergeCoordination().getDefaultDetails()
272         : null;
273     }
274     if (rsCoprocessorHost == null) {
275       rsCoprocessorHost = server != null ?
276         ((HRegionServer) server).getRegionServerCoprocessorHost() : null;
277     }
278     final HRegion mergedRegion = createMergedRegion(server, services, user);
279     if (rsCoprocessorHost != null) {
280       if (user == null) {
281         rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
282       } else {
283         try {
284           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
285             @Override
286             public Void run() throws Exception {
287               rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion);
288               return null;
289             }
290           });
291         } catch (InterruptedException ie) {
292           InterruptedIOException iioe = new InterruptedIOException();
293           iioe.initCause(ie);
294           throw iioe;
295         }
296       }
297     }
298     stepsAfterPONR(server, services, mergedRegion, user);
299 
300     transition(RegionMergeTransactionPhase.COMPLETED);
301 
302     return mergedRegion;
303   }
304 
305   @Deprecated
306   public void stepsAfterPONR(final Server server, final RegionServerServices services,
307       final HRegion mergedRegion) throws IOException {
308     stepsAfterPONR(server, services, mergedRegion, null);
309   }
310 
311   public void stepsAfterPONR(final Server server, final RegionServerServices services,
312       final HRegion mergedRegion, User user) throws IOException {
313     openMergedRegion(server, services, mergedRegion);
314     if (useCoordination(server)) {
315       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
316         .getRegionMergeCoordination().completeRegionMergeTransaction(services,
317           mergedRegionInfo, region_a, region_b, rmd, mergedRegion);
318     }
319     if (rsCoprocessorHost != null) {
320       if (user == null) {
321         rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
322       } else {
323         try {
324           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
325             @Override
326             public Void run() throws Exception {
327               rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
328               return null;
329             }
330           });
331         } catch (InterruptedException ie) {
332           InterruptedIOException iioe = new InterruptedIOException();
333           iioe.initCause(ie);
334           throw iioe;
335         }
336       }
337     }
338   }
339 
340   /**
341    * Prepare the merged region and region files.
342    * @param server Hosting server instance. Can be null when testing
343    * @param services Used to online/offline regions.
344    * @return merged region
345    * @throws IOException If thrown, transaction failed. Call
346    *           {@link #rollback(Server, RegionServerServices)}
347    */
348   HRegion createMergedRegion(final Server server,
349       final RegionServerServices services, User user) throws IOException {
350     LOG.info("Starting merge of " + region_a + " and "
351         + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible);
352     if ((server != null && server.isStopped())
353         || (services != null && services.isStopping())) {
354       throw new IOException("Server is stopped or stopping");
355     }
356 
357     if (rsCoprocessorHost != null) {
358       boolean ret = false;
359       if (user == null) {
360         ret = rsCoprocessorHost.preMerge(region_a, region_b);
361       } else {
362         try {
363           ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
364             @Override
365             public Boolean run() throws Exception {
366               return rsCoprocessorHost.preMerge(region_a, region_b);
367             }
368           });
369         } catch (InterruptedException ie) {
370           InterruptedIOException iioe = new InterruptedIOException();
371           iioe.initCause(ie);
372           throw iioe;
373         }
374       }
375       if (ret) {
376         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
377             + this.region_b + " merge.");
378       }
379     }
380 
381     // If true, no cluster to write meta edits to or to use coordination.
382     boolean testing = server == null ? true : server.getConfiguration()
383         .getBoolean("hbase.testing.nocluster", false);
384 
385     HRegion mergedRegion = stepsBeforePONR(server, services, testing);
386 
387     @MetaMutationAnnotation
388     final List<Mutation> metaEntries = new ArrayList<Mutation>();
389     if (rsCoprocessorHost != null) {
390       boolean ret = false;
391       if (user == null) {
392         ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
393       } else {
394         try {
395           ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
396             @Override
397             public Boolean run() throws Exception {
398               return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
399             }
400           });
401         } catch (InterruptedException ie) {
402           InterruptedIOException iioe = new InterruptedIOException();
403           iioe.initCause(ie);
404           throw iioe;
405         }
406       }
407 
408       if (ret) {
409         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
410             + this.region_b + " merge.");
411       }
412       try {
413         for (Mutation p : metaEntries) {
414           HRegionInfo.parseRegionName(p.getRow());
415         }
416       } catch (IOException e) {
417         LOG.error("Row key of mutation from coprocessor is not parsable as region name."
418             + "Mutations from coprocessor should only be for hbase:meta table.", e);
419         throw e;
420       }
421     }
422 
423     // This is the point of no return. Similar with SplitTransaction.
424     // IF we reach the PONR then subsequent failures need to crash out this
425     // regionserver
426     transition(RegionMergeTransactionPhase.PONR);
427 
428     // Add merged region and delete region_a and region_b
429     // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
430     // will determine whether the region is merged or not in case of failures.
431     // If it is successful, master will roll-forward, if not, master will
432     // rollback
433     if (!testing && useCoordinationForAssignment) {
434       if (metaEntries.isEmpty()) {
435         MetaTableAccessor.mergeRegions(server.getConnection(),
436           mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
437           server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime);
438       } else {
439         mergeRegionsAndPutMetaEntries(server.getConnection(),
440           mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
441           server.getServerName(), metaEntries, region_a.getTableDesc().getRegionReplication());
442       }
443     } else if (services != null && !useCoordinationForAssignment) {
444       if (!services.reportRegionStateTransition(TransitionCode.MERGE_PONR,
445           mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
446         // Passed PONR, let SSH clean it up
447         throw new IOException("Failed to notify master that merge passed PONR: "
448           + region_a.getRegionInfo().getRegionNameAsString() + " and "
449           + region_b.getRegionInfo().getRegionNameAsString());
450       }
451     }
452     return mergedRegion;
453   }
454 
455   private void mergeRegionsAndPutMetaEntries(HConnection hConnection,
456       HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB,
457       ServerName serverName, List<Mutation> metaEntries,
458       int regionReplication) throws IOException {
459     prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries,
460       regionReplication);
461     MetaTableAccessor.mutateMetaTable(hConnection, metaEntries);
462   }
463 
464   public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
465       HRegionInfo regionB, ServerName serverName, List<Mutation> mutations,
466       int regionReplication) throws IOException {
467     HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
468 
469     // use the maximum of what master passed us vs local time.
470     long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
471 
472     // Put for parent
473     Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time);
474     putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
475       regionA.toByteArray());
476     putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
477       regionB.toByteArray());
478     mutations.add(putOfMerged);
479     // Deletes for merging regions
480     Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time);
481     Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time);
482     mutations.add(deleteA);
483     mutations.add(deleteB);
484 
485     // Add empty locations for region replicas of the merged region so that number of replicas
486     // can be cached whenever the primary region is looked up from meta
487     for (int i = 1; i < regionReplication; i++) {
488       addEmptyLocation(putOfMerged, i);
489     }
490 
491     // The merged is a new region, openSeqNum = 1 is fine.
492     addLocation(putOfMerged, serverName, 1);
493   }
494 
495   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
496     p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
497         .toBytes(sn.getHostAndPort()));
498     p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
499         .getStartcode()));
500     p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
501     return p;
502   }
503 
504   private static Put addEmptyLocation(final Put p, int replicaId) {
505     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
506     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId),
507       null);
508     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
509     return p;
510   }
511 
512   public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
513       boolean testing) throws IOException {
514     if (rmd == null) {
515       rmd = server != null && server.getCoordinatedStateManager() != null ?
516           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
517             .getRegionMergeCoordination().getDefaultDetails()
518         : null;
519     }
520 
521     // If server doesn't have a coordination state manager, don't do coordination actions.
522     if (useCoordination(server)) {
523       try {
524         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
525             .getRegionMergeCoordination().startRegionMergeTransaction(mergedRegionInfo,
526               server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
527       } catch (IOException e) {
528         throw new IOException("Failed to start region merge transaction for "
529             + this.mergedRegionInfo.getRegionNameAsString(), e);
530       }
531     } else if (services != null && !useCoordinationForAssignment) {
532       if (!services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE,
533           mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
534         throw new IOException("Failed to get ok from master to merge "
535           + region_a.getRegionInfo().getRegionNameAsString() + " and "
536           + region_b.getRegionInfo().getRegionNameAsString());
537       }
538     }
539     transition(RegionMergeTransactionPhase.SET_MERGING);
540     if (useCoordination(server)) {
541       // After creating the merge node, wait for master to transition it
542       // from PENDING_MERGE to MERGING so that we can move on. We want master
543       // knows about it and won't transition any region which is merging.
544       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
545         .getRegionMergeCoordination().waitForRegionMergeTransaction(services,
546           mergedRegionInfo, region_a, region_b, rmd);
547     }
548 
549     this.region_a.getRegionFileSystem().createMergesDir();
550     transition(RegionMergeTransactionPhase.CREATED_MERGE_DIR);
551 
552     Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
553         services, this.region_a, true, testing);
554     Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
555         services, this.region_b, false, testing);
556 
557     assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
558 
559 
560     //
561     // mergeStoreFiles creates merged region dirs under the region_a merges dir
562     // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
563     // clean this up.
564     mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
565 
566     if (useCoordination(server)) {
567       try {
568         // Do the final check in case any merging region is moved somehow. If so, the transition
569         // will fail.
570         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
571             .getRegionMergeCoordination().confirmRegionMergeTransaction(this.mergedRegionInfo,
572               region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), rmd);
573       } catch (IOException e) {
574         throw new IOException("Failed setting MERGING on "
575             + this.mergedRegionInfo.getRegionNameAsString(), e);
576       }
577     }
578 
579     // Log to the journal that we are creating merged region. We could fail
580     // halfway through. If we do, we could have left
581     // stuff in fs that needs cleanup -- a storefile or two. Thats why we
582     // add entry to journal BEFORE rather than AFTER the change.
583     transition(RegionMergeTransactionPhase.STARTED_MERGED_REGION_CREATION);
584     HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
585         this.region_b, this.mergedRegionInfo);
586     return mergedRegion;
587   }
588 
589   /**
590    * Create a merged region from the merges directory under region a. In order
591    * to mock it for tests, place it with a new method.
592    * @param a hri of region a
593    * @param b hri of region b
594    * @param mergedRegion hri of merged region
595    * @return merged HRegion.
596    * @throws IOException
597    */
598   HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
599       final HRegionInfo mergedRegion) throws IOException {
600     return a.createMergedRegionFromMerges(mergedRegion, b);
601   }
602 
603   /**
604    * Close the merging region and offline it in regionserver
605    * @param services
606    * @param region
607    * @param isRegionA true if it is merging region a, false if it is region b
608    * @param testing true if it is testing
609    * @return a map of family name to list of store files
610    * @throws IOException
611    */
612   private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
613       final RegionServerServices services, final HRegion region,
614       final boolean isRegionA, final boolean testing) throws IOException {
615     Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
616     Exception exceptionToThrow = null;
617     try {
618       hstoreFilesToMerge = region.close(false);
619     } catch (Exception e) {
620       exceptionToThrow = e;
621     }
622     if (exceptionToThrow == null && hstoreFilesToMerge == null) {
623       // The region was closed by a concurrent thread. We can't continue
624       // with the merge, instead we must just abandon the merge. If we
625       // reopen or merge this could cause problems because the region has
626       // probably already been moved to a different server, or is in the
627       // process of moving to a different server.
628       exceptionToThrow = closedByOtherException;
629     }
630     if (exceptionToThrow != closedByOtherException) {
631       transition(isRegionA ? RegionMergeTransactionPhase.CLOSED_REGION_A
632           : RegionMergeTransactionPhase.CLOSED_REGION_B);
633     }
634     if (exceptionToThrow != null) {
635       if (exceptionToThrow instanceof IOException)
636         throw (IOException) exceptionToThrow;
637       throw new IOException(exceptionToThrow);
638     }
639 
640     if (!testing) {
641       services.removeFromOnlineRegions(region, null);
642     }
643     transition(isRegionA ? RegionMergeTransactionPhase.OFFLINED_REGION_A
644         : RegionMergeTransactionPhase.OFFLINED_REGION_B);
645     return hstoreFilesToMerge;
646   }
647 
648   /**
649    * Get merged region info through the specified two regions
650    * @param a merging region A
651    * @param b merging region B
652    * @return the merged region info
653    */
654   public static HRegionInfo getMergedRegionInfo(final HRegionInfo a,
655       final HRegionInfo b) {
656     long rid = EnvironmentEdgeManager.currentTime();
657     // Regionid is timestamp. Merged region's id can't be less than that of
658     // merging regions else will insert at wrong location in hbase:meta
659     if (rid < a.getRegionId() || rid < b.getRegionId()) {
660       LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
661           + " and " + b.getRegionId() + ", but current time here is " + rid);
662       rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
663     }
664 
665     byte[] startKey = null;
666     byte[] endKey = null;
667     // Choose the smaller as start key
668     if (a.compareTo(b) <= 0) {
669       startKey = a.getStartKey();
670     } else {
671       startKey = b.getStartKey();
672     }
673     // Choose the bigger as end key
674     if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
675         || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
676             && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) {
677       endKey = a.getEndKey();
678     } else {
679       endKey = b.getEndKey();
680     }
681 
682     // Merged region is sorted between two merging regions in META
683     HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey,
684         endKey, false, rid);
685     return mergedRegionInfo;
686   }
687 
688   /**
689    * Perform time consuming opening of the merged region.
690    * @param server Hosting server instance. Can be null when testing
691    * @param services Used to online/offline regions.
692    * @param merged the merged region
693    * @throws IOException If thrown, transaction failed. Call
694    *           {@link #rollback(Server, RegionServerServices)}
695    */
696   void openMergedRegion(final Server server,
697       final RegionServerServices services, HRegion merged) throws IOException {
698     boolean stopped = server != null && server.isStopped();
699     boolean stopping = services != null && services.isStopping();
700     if (stopped || stopping) {
701       LOG.info("Not opening merged region  " + merged.getRegionInfo().getRegionNameAsString()
702           + " because stopping=" + stopping + ", stopped=" + stopped);
703       return;
704     }
705     HRegionInfo hri = merged.getRegionInfo();
706     LoggingProgressable reporter = server == null ? null
707         : new LoggingProgressable(hri, server.getConfiguration().getLong(
708             "hbase.regionserver.regionmerge.open.log.interval", 10000));
709     merged.openHRegion(reporter);
710 
711     if (services != null) {
712       try {
713         if (useCoordinationForAssignment) {
714           services.postOpenDeployTasks(merged);
715         } else if (!services.reportRegionStateTransition(TransitionCode.MERGED,
716             mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
717           throw new IOException("Failed to report merged region to master: "
718             + mergedRegionInfo.getShortNameToLog());
719         }
720         services.addToOnlineRegions(merged);
721       } catch (KeeperException ke) {
722         throw new IOException(ke);
723       }
724     }
725 
726   }
727 
728   /**
729    * Create reference file(s) of merging regions under the region_a merges dir
730    * @param hstoreFilesOfRegionA
731    * @param hstoreFilesOfRegionB
732    * @throws IOException
733    */
734   private void mergeStoreFiles(
735       Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
736       Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
737       throws IOException {
738     // Create reference file(s) of region A in mergdir
739     HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
740     for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
741         .entrySet()) {
742       String familyName = Bytes.toString(entry.getKey());
743       for (StoreFile storeFile : entry.getValue()) {
744         fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
745             this.mergesdir);
746       }
747     }
748     // Create reference file(s) of region B in mergedir
749     HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
750     for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
751         .entrySet()) {
752       String familyName = Bytes.toString(entry.getKey());
753       for (StoreFile storeFile : entry.getValue()) {
754         fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
755             this.mergesdir);
756       }
757     }
758   }
759 
760   /**
761    * @param server Hosting server instance (May be null when testing).
762    * @param services Services of regionserver, used to online regions.
763    * @throws IOException If thrown, rollback failed. Take drastic action.
764    * @return True if we successfully rolled back, false if we got to the point
765    *         of no return and so now need to abort the server to minimize
766    *         damage.
767    */
768   @Override
769   @SuppressWarnings("deprecation")
770   public boolean rollback(final Server server,
771       final RegionServerServices services) throws IOException {
772     if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
773       LOG.warn("Should use execute(Server, RegionServerServices, User)");
774     }
775     return rollback(server, services, null);
776   }
777 
778   @Override
779   public boolean rollback(final Server server,
780       final RegionServerServices services, User user) throws IOException {
781     assert this.mergedRegionInfo != null;
782     // Coprocessor callback
783     if (rsCoprocessorHost != null) {
784       if (user == null) {
785         rsCoprocessorHost.preRollBackMerge(region_a, region_b);
786       } else {
787         try {
788           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
789             @Override
790             public Void run() throws Exception {
791               rsCoprocessorHost.preRollBackMerge(region_a, region_b);
792               return null;
793             }
794           });
795         } catch (InterruptedException ie) {
796           InterruptedIOException iioe = new InterruptedIOException();
797           iioe.initCause(ie);
798           throw iioe;
799         }
800       }
801     }
802 
803     boolean result = true;
804     ListIterator<JournalEntry> iterator = this.journal
805         .listIterator(this.journal.size());
806     // Iterate in reverse.
807     while (iterator.hasPrevious()) {
808       JournalEntry je = iterator.previous();
809 
810       transition(je.getPhase(), true);
811 
812       switch (je.getPhase()) {
813 
814         case SET_MERGING:
815         if (useCoordination(server)) {
816           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
817               .getRegionMergeCoordination().clean(this.mergedRegionInfo);
818           } else if (services != null && !useCoordinationForAssignment
819               && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED,
820                   mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
821             return false;
822         }
823           break;
824 
825         case CREATED_MERGE_DIR:
826           this.region_a.writestate.writesEnabled = true;
827           this.region_b.writestate.writesEnabled = true;
828           this.region_a.getRegionFileSystem().cleanupMergesDir();
829           break;
830 
831         case CLOSED_REGION_A:
832           try {
833             // So, this returns a seqid but if we just closed and then reopened,
834             // we should be ok. On close, we flushed using sequenceid obtained
835             // from hosting regionserver so no need to propagate the sequenceid
836             // returned out of initialize below up into regionserver as we
837             // normally do.
838             this.region_a.initialize();
839           } catch (IOException e) {
840             LOG.error("Failed rollbacking CLOSED_REGION_A of region "
841                 + this.region_a.getRegionInfo().getRegionNameAsString(), e);
842             throw new RuntimeException(e);
843           }
844           break;
845 
846         case OFFLINED_REGION_A:
847           if (services != null)
848             services.addToOnlineRegions(this.region_a);
849           break;
850 
851         case CLOSED_REGION_B:
852           try {
853             this.region_b.initialize();
854           } catch (IOException e) {
855             LOG.error("Failed rollbacking CLOSED_REGION_A of region "
856                 + this.region_b.getRegionInfo().getRegionNameAsString(), e);
857             throw new RuntimeException(e);
858           }
859           break;
860 
861         case OFFLINED_REGION_B:
862           if (services != null)
863             services.addToOnlineRegions(this.region_b);
864           break;
865 
866         case STARTED_MERGED_REGION_CREATION:
867           this.region_a.getRegionFileSystem().cleanupMergedRegion(
868               this.mergedRegionInfo);
869           break;
870 
871         case PONR:
872           // We got to the point-of-no-return so we need to just abort. Return
873           // immediately. Do not clean up created merged regions.
874           return false;
875 
876         // Informational only cases
877         case STARTED:
878         case PREPARED:
879         case COMPLETED:
880           break;
881 
882         default:
883           throw new RuntimeException("Unhandled journal entry: " + je);
884       }
885     }
886     // Coprocessor callback
887     if (rsCoprocessorHost != null) {
888       if (user == null) {
889         rsCoprocessorHost.postRollBackMerge(region_a, region_b);
890       } else {
891         try {
892           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
893             @Override
894             public Void run() throws Exception {
895               rsCoprocessorHost.postRollBackMerge(region_a, region_b);
896               return null;
897             }
898           });
899         } catch (InterruptedException ie) {
900           InterruptedIOException iioe = new InterruptedIOException();
901           iioe.initCause(ie);
902           throw iioe;
903         }
904       }
905     }
906 
907     return result;
908   }
909 
910   @Override
911   public HRegionInfo getMergedRegionInfo() {
912     return this.mergedRegionInfo;
913   }
914 
915   // For unit testing.
916   Path getMergesDir() {
917     return this.mergesdir;
918   }
919 
920   private boolean useCoordination(final Server server) {
921     return server != null && useCoordinationForAssignment
922         && server.getCoordinatedStateManager() != null;
923   }
924 
925 
926 
927   /**
928    * Checks if the given region has merge qualifier in hbase:meta
929    * @param services
930    * @param regionName name of specified region
931    * @return true if the given region has merge qualifier in META.(It will be
932    *         cleaned by CatalogJanitor)
933    * @throws IOException
934    */
935   boolean hasMergeQualifierInMeta(final RegionServerServices services,
936       final byte[] regionName) throws IOException {
937     if (services == null) return false;
938     // Get merge regions if it is a merged region and already has merge
939     // qualifier
940     Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaTableAccessor
941         .getRegionsFromMergeQualifier(services.getConnection(), regionName);
942     if (mergeRegions != null &&
943         (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
944       // It has merge qualifier
945       return true;
946     }
947     return false;
948   }
949 
950   @Override
951   public List<JournalEntry> getJournal() {
952     return journal;
953   }
954 
955   @Override
956   public RegionMergeTransaction registerTransactionListener(TransactionListener listener) {
957     listeners.add(listener);
958     return this;
959   }
960 
961   @Override
962   public Server getServer() {
963     return server;
964   }
965 
966   @Override
967   public RegionServerServices getRegionServerServices() {
968     return rsServices;
969   }
970 
971 }