1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.security.PrivilegedExceptionAction;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.ListIterator;
27 import java.util.Map;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.MetaMutationAnnotation;
36 import org.apache.hadoop.hbase.Server;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.MetaTableAccessor;
39 import org.apache.hadoop.hbase.client.Delete;
40 import org.apache.hadoop.hbase.client.HConnection;
41 import org.apache.hadoop.hbase.client.Mutation;
42 import org.apache.hadoop.hbase.client.Put;
43 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
44 import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails;
45 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
46 import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable;
47 import org.apache.hadoop.hbase.security.User;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.ConfigUtil;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51 import org.apache.hadoop.hbase.util.Pair;
52 import org.apache.zookeeper.KeeperException;
53
54 @InterfaceAudience.Private
55 public class RegionMergeTransactionImpl implements RegionMergeTransaction {
56 private static final Log LOG = LogFactory.getLog(RegionMergeTransactionImpl.class);
57
58
59 private HRegionInfo mergedRegionInfo;
60
61 private final HRegion region_a;
62 private final HRegion region_b;
63
64 private final Path mergesdir;
65
66 private final boolean forcible;
67 private boolean useCoordinationForAssignment;
68 private final long masterSystemTime;
69
70
71
72
73
74 private RegionMergeTransactionPhase currentPhase = RegionMergeTransactionPhase.STARTED;
75 private Server server;
76 private RegionServerServices rsServices;
77
78 public static class JournalEntryImpl implements JournalEntry {
79 private RegionMergeTransactionPhase type;
80 private long timestamp;
81
82 public JournalEntryImpl(RegionMergeTransactionPhase type) {
83 this(type, EnvironmentEdgeManager.currentTime());
84 }
85
86 public JournalEntryImpl(RegionMergeTransactionPhase type, long timestamp) {
87 this.type = type;
88 this.timestamp = timestamp;
89 }
90
91 @Override
92 public String toString() {
93 StringBuilder sb = new StringBuilder();
94 sb.append(type);
95 sb.append(" at ");
96 sb.append(timestamp);
97 return sb.toString();
98 }
99
100 @Override
101 public RegionMergeTransactionPhase getPhase() {
102 return type;
103 }
104
105 @Override
106 public long getTimeStamp() {
107 return timestamp;
108 }
109 }
110
111
112
113
114 private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
115
116
117
118
119 private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
120
121 private static IOException closedByOtherException = new IOException(
122 "Failed to close region: already closed by another thread");
123
124 private RegionServerCoprocessorHost rsCoprocessorHost = null;
125
126 private RegionMergeDetails rmd;
127
128
129
130
131
132
133
134 public RegionMergeTransactionImpl(final Region a, final Region b,
135 final boolean forcible) {
136 this(a, b, forcible, EnvironmentEdgeManager.currentTime());
137 }
138
139
140
141
142
143
144
145 public RegionMergeTransactionImpl(final Region a, final Region b,
146 final boolean forcible, long masterSystemTime) {
147 if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
148 this.region_a = (HRegion)a;
149 this.region_b = (HRegion)b;
150 } else {
151 this.region_a = (HRegion)b;
152 this.region_b = (HRegion)a;
153 }
154 this.forcible = forcible;
155 this.masterSystemTime = masterSystemTime;
156 this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
157 }
158
159 private void transition(RegionMergeTransactionPhase nextPhase) throws IOException {
160 transition(nextPhase, false);
161 }
162
163 private void transition(RegionMergeTransactionPhase nextPhase, boolean isRollback)
164 throws IOException {
165 if (!isRollback) {
166
167
168 this.journal.add(new JournalEntryImpl(nextPhase));
169 }
170 for (int i = 0; i < listeners.size(); i++) {
171 TransactionListener listener = listeners.get(i);
172 if (!isRollback) {
173 listener.transition(this, currentPhase, nextPhase);
174 } else {
175 listener.rollback(this, currentPhase, nextPhase);
176 }
177 }
178 currentPhase = nextPhase;
179 }
180
181
182
183
184
185
186
187 @Override
188 public boolean prepare(final RegionServerServices services) throws IOException {
189 if (!region_a.getTableDesc().getTableName()
190 .equals(region_b.getTableDesc().getTableName())) {
191 LOG.info("Can't merge regions " + region_a + "," + region_b
192 + " because they do not belong to the same table");
193 return false;
194 }
195 if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
196 LOG.info("Can't merge the same region " + region_a);
197 return false;
198 }
199 if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
200 region_b.getRegionInfo())) {
201 String msg = "Skip merging " + this.region_a.getRegionInfo().getRegionNameAsString()
202 + " and " + this.region_b.getRegionInfo().getRegionNameAsString()
203 + ", because they are not adjacent.";
204 LOG.info(msg);
205 return false;
206 }
207 if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
208 return false;
209 }
210 try {
211 boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
212 region_a.getRegionInfo().getRegionName());
213 if (regionAHasMergeQualifier ||
214 hasMergeQualifierInMeta(services, region_b.getRegionInfo().getRegionName())) {
215 LOG.debug("Region " + (regionAHasMergeQualifier ?
216 region_a.getRegionInfo().getRegionNameAsString() :
217 region_b.getRegionInfo().getRegionNameAsString())
218 + " is not mergeable because it has merge qualifier in META");
219 return false;
220 }
221 } catch (IOException e) {
222 LOG.warn("Failed judging whether merge transaction is available for "
223 + region_a.getRegionInfo().getRegionNameAsString() + " and "
224 + region_b.getRegionInfo().getRegionNameAsString(), e);
225 return false;
226 }
227
228
229
230
231
232
233
234
235 this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
236 region_b.getRegionInfo());
237
238 transition(RegionMergeTransactionPhase.PREPARED);
239 return true;
240 }
241
242
243
244
245
246
247
248
249
250
251
252 @Override
253 public HRegion execute(final Server server,
254 final RegionServerServices services) throws IOException {
255 if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
256 LOG.warn("Should use execute(Server, RegionServerServices, User)");
257 }
258 return execute(server, services, null);
259 }
260
261 @Override
262 public HRegion execute(final Server server, final RegionServerServices services, User user)
263 throws IOException {
264 this.server = server;
265 this.rsServices = services;
266 useCoordinationForAssignment =
267 server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
268 if (rmd == null) {
269 rmd = server != null && server.getCoordinatedStateManager() != null ?
270 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
271 .getRegionMergeCoordination().getDefaultDetails()
272 : null;
273 }
274 if (rsCoprocessorHost == null) {
275 rsCoprocessorHost = server != null ?
276 ((HRegionServer) server).getRegionServerCoprocessorHost() : null;
277 }
278 final HRegion mergedRegion = createMergedRegion(server, services, user);
279 if (rsCoprocessorHost != null) {
280 if (user == null) {
281 rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
282 } else {
283 try {
284 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
285 @Override
286 public Void run() throws Exception {
287 rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion);
288 return null;
289 }
290 });
291 } catch (InterruptedException ie) {
292 InterruptedIOException iioe = new InterruptedIOException();
293 iioe.initCause(ie);
294 throw iioe;
295 }
296 }
297 }
298 stepsAfterPONR(server, services, mergedRegion, user);
299
300 transition(RegionMergeTransactionPhase.COMPLETED);
301
302 return mergedRegion;
303 }
304
305 @Deprecated
306 public void stepsAfterPONR(final Server server, final RegionServerServices services,
307 final HRegion mergedRegion) throws IOException {
308 stepsAfterPONR(server, services, mergedRegion, null);
309 }
310
311 public void stepsAfterPONR(final Server server, final RegionServerServices services,
312 final HRegion mergedRegion, User user) throws IOException {
313 openMergedRegion(server, services, mergedRegion);
314 if (useCoordination(server)) {
315 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
316 .getRegionMergeCoordination().completeRegionMergeTransaction(services,
317 mergedRegionInfo, region_a, region_b, rmd, mergedRegion);
318 }
319 if (rsCoprocessorHost != null) {
320 if (user == null) {
321 rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
322 } else {
323 try {
324 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
325 @Override
326 public Void run() throws Exception {
327 rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
328 return null;
329 }
330 });
331 } catch (InterruptedException ie) {
332 InterruptedIOException iioe = new InterruptedIOException();
333 iioe.initCause(ie);
334 throw iioe;
335 }
336 }
337 }
338 }
339
340
341
342
343
344
345
346
347
348 HRegion createMergedRegion(final Server server,
349 final RegionServerServices services, User user) throws IOException {
350 LOG.info("Starting merge of " + region_a + " and "
351 + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible);
352 if ((server != null && server.isStopped())
353 || (services != null && services.isStopping())) {
354 throw new IOException("Server is stopped or stopping");
355 }
356
357 if (rsCoprocessorHost != null) {
358 boolean ret = false;
359 if (user == null) {
360 ret = rsCoprocessorHost.preMerge(region_a, region_b);
361 } else {
362 try {
363 ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
364 @Override
365 public Boolean run() throws Exception {
366 return rsCoprocessorHost.preMerge(region_a, region_b);
367 }
368 });
369 } catch (InterruptedException ie) {
370 InterruptedIOException iioe = new InterruptedIOException();
371 iioe.initCause(ie);
372 throw iioe;
373 }
374 }
375 if (ret) {
376 throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
377 + this.region_b + " merge.");
378 }
379 }
380
381
382 boolean testing = server == null ? true : server.getConfiguration()
383 .getBoolean("hbase.testing.nocluster", false);
384
385 HRegion mergedRegion = stepsBeforePONR(server, services, testing);
386
387 @MetaMutationAnnotation
388 final List<Mutation> metaEntries = new ArrayList<Mutation>();
389 if (rsCoprocessorHost != null) {
390 boolean ret = false;
391 if (user == null) {
392 ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
393 } else {
394 try {
395 ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
396 @Override
397 public Boolean run() throws Exception {
398 return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
399 }
400 });
401 } catch (InterruptedException ie) {
402 InterruptedIOException iioe = new InterruptedIOException();
403 iioe.initCause(ie);
404 throw iioe;
405 }
406 }
407
408 if (ret) {
409 throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
410 + this.region_b + " merge.");
411 }
412 try {
413 for (Mutation p : metaEntries) {
414 HRegionInfo.parseRegionName(p.getRow());
415 }
416 } catch (IOException e) {
417 LOG.error("Row key of mutation from coprocessor is not parsable as region name."
418 + "Mutations from coprocessor should only be for hbase:meta table.", e);
419 throw e;
420 }
421 }
422
423
424
425
426 transition(RegionMergeTransactionPhase.PONR);
427
428
429
430
431
432
433 if (!testing && useCoordinationForAssignment) {
434 if (metaEntries.isEmpty()) {
435 MetaTableAccessor.mergeRegions(server.getConnection(),
436 mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
437 server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime);
438 } else {
439 mergeRegionsAndPutMetaEntries(server.getConnection(),
440 mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
441 server.getServerName(), metaEntries, region_a.getTableDesc().getRegionReplication());
442 }
443 } else if (services != null && !useCoordinationForAssignment) {
444 if (!services.reportRegionStateTransition(TransitionCode.MERGE_PONR,
445 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
446
447 throw new IOException("Failed to notify master that merge passed PONR: "
448 + region_a.getRegionInfo().getRegionNameAsString() + " and "
449 + region_b.getRegionInfo().getRegionNameAsString());
450 }
451 }
452 return mergedRegion;
453 }
454
455 private void mergeRegionsAndPutMetaEntries(HConnection hConnection,
456 HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB,
457 ServerName serverName, List<Mutation> metaEntries,
458 int regionReplication) throws IOException {
459 prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries,
460 regionReplication);
461 MetaTableAccessor.mutateMetaTable(hConnection, metaEntries);
462 }
463
464 public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
465 HRegionInfo regionB, ServerName serverName, List<Mutation> mutations,
466 int regionReplication) throws IOException {
467 HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
468
469
470 long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
471
472
473 Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time);
474 putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
475 regionA.toByteArray());
476 putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
477 regionB.toByteArray());
478 mutations.add(putOfMerged);
479
480 Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time);
481 Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time);
482 mutations.add(deleteA);
483 mutations.add(deleteB);
484
485
486
487 for (int i = 1; i < regionReplication; i++) {
488 addEmptyLocation(putOfMerged, i);
489 }
490
491
492 addLocation(putOfMerged, serverName, 1);
493 }
494
495 public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
496 p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
497 .toBytes(sn.getHostAndPort()));
498 p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
499 .getStartcode()));
500 p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
501 return p;
502 }
503
504 private static Put addEmptyLocation(final Put p, int replicaId) {
505 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
506 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId),
507 null);
508 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
509 return p;
510 }
511
512 public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
513 boolean testing) throws IOException {
514 if (rmd == null) {
515 rmd = server != null && server.getCoordinatedStateManager() != null ?
516 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
517 .getRegionMergeCoordination().getDefaultDetails()
518 : null;
519 }
520
521
522 if (useCoordination(server)) {
523 try {
524 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
525 .getRegionMergeCoordination().startRegionMergeTransaction(mergedRegionInfo,
526 server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
527 } catch (IOException e) {
528 throw new IOException("Failed to start region merge transaction for "
529 + this.mergedRegionInfo.getRegionNameAsString(), e);
530 }
531 } else if (services != null && !useCoordinationForAssignment) {
532 if (!services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE,
533 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
534 throw new IOException("Failed to get ok from master to merge "
535 + region_a.getRegionInfo().getRegionNameAsString() + " and "
536 + region_b.getRegionInfo().getRegionNameAsString());
537 }
538 }
539 transition(RegionMergeTransactionPhase.SET_MERGING);
540 if (useCoordination(server)) {
541
542
543
544 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
545 .getRegionMergeCoordination().waitForRegionMergeTransaction(services,
546 mergedRegionInfo, region_a, region_b, rmd);
547 }
548
549 this.region_a.getRegionFileSystem().createMergesDir();
550 transition(RegionMergeTransactionPhase.CREATED_MERGE_DIR);
551
552 Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
553 services, this.region_a, true, testing);
554 Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
555 services, this.region_b, false, testing);
556
557 assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
558
559
560
561
562
563
564 mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
565
566 if (useCoordination(server)) {
567 try {
568
569
570 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
571 .getRegionMergeCoordination().confirmRegionMergeTransaction(this.mergedRegionInfo,
572 region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), rmd);
573 } catch (IOException e) {
574 throw new IOException("Failed setting MERGING on "
575 + this.mergedRegionInfo.getRegionNameAsString(), e);
576 }
577 }
578
579
580
581
582
583 transition(RegionMergeTransactionPhase.STARTED_MERGED_REGION_CREATION);
584 HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
585 this.region_b, this.mergedRegionInfo);
586 return mergedRegion;
587 }
588
589
590
591
592
593
594
595
596
597
598 HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
599 final HRegionInfo mergedRegion) throws IOException {
600 return a.createMergedRegionFromMerges(mergedRegion, b);
601 }
602
603
604
605
606
607
608
609
610
611
612 private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
613 final RegionServerServices services, final HRegion region,
614 final boolean isRegionA, final boolean testing) throws IOException {
615 Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
616 Exception exceptionToThrow = null;
617 try {
618 hstoreFilesToMerge = region.close(false);
619 } catch (Exception e) {
620 exceptionToThrow = e;
621 }
622 if (exceptionToThrow == null && hstoreFilesToMerge == null) {
623
624
625
626
627
628 exceptionToThrow = closedByOtherException;
629 }
630 if (exceptionToThrow != closedByOtherException) {
631 transition(isRegionA ? RegionMergeTransactionPhase.CLOSED_REGION_A
632 : RegionMergeTransactionPhase.CLOSED_REGION_B);
633 }
634 if (exceptionToThrow != null) {
635 if (exceptionToThrow instanceof IOException)
636 throw (IOException) exceptionToThrow;
637 throw new IOException(exceptionToThrow);
638 }
639
640 if (!testing) {
641 services.removeFromOnlineRegions(region, null);
642 }
643 transition(isRegionA ? RegionMergeTransactionPhase.OFFLINED_REGION_A
644 : RegionMergeTransactionPhase.OFFLINED_REGION_B);
645 return hstoreFilesToMerge;
646 }
647
648
649
650
651
652
653
654 public static HRegionInfo getMergedRegionInfo(final HRegionInfo a,
655 final HRegionInfo b) {
656 long rid = EnvironmentEdgeManager.currentTime();
657
658
659 if (rid < a.getRegionId() || rid < b.getRegionId()) {
660 LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
661 + " and " + b.getRegionId() + ", but current time here is " + rid);
662 rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
663 }
664
665 byte[] startKey = null;
666 byte[] endKey = null;
667
668 if (a.compareTo(b) <= 0) {
669 startKey = a.getStartKey();
670 } else {
671 startKey = b.getStartKey();
672 }
673
674 if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
675 || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
676 && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) {
677 endKey = a.getEndKey();
678 } else {
679 endKey = b.getEndKey();
680 }
681
682
683 HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey,
684 endKey, false, rid);
685 return mergedRegionInfo;
686 }
687
688
689
690
691
692
693
694
695
696 void openMergedRegion(final Server server,
697 final RegionServerServices services, HRegion merged) throws IOException {
698 boolean stopped = server != null && server.isStopped();
699 boolean stopping = services != null && services.isStopping();
700 if (stopped || stopping) {
701 LOG.info("Not opening merged region " + merged.getRegionInfo().getRegionNameAsString()
702 + " because stopping=" + stopping + ", stopped=" + stopped);
703 return;
704 }
705 HRegionInfo hri = merged.getRegionInfo();
706 LoggingProgressable reporter = server == null ? null
707 : new LoggingProgressable(hri, server.getConfiguration().getLong(
708 "hbase.regionserver.regionmerge.open.log.interval", 10000));
709 merged.openHRegion(reporter);
710
711 if (services != null) {
712 try {
713 if (useCoordinationForAssignment) {
714 services.postOpenDeployTasks(merged);
715 } else if (!services.reportRegionStateTransition(TransitionCode.MERGED,
716 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
717 throw new IOException("Failed to report merged region to master: "
718 + mergedRegionInfo.getShortNameToLog());
719 }
720 services.addToOnlineRegions(merged);
721 } catch (KeeperException ke) {
722 throw new IOException(ke);
723 }
724 }
725
726 }
727
728
729
730
731
732
733
734 private void mergeStoreFiles(
735 Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
736 Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
737 throws IOException {
738
739 HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
740 for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
741 .entrySet()) {
742 String familyName = Bytes.toString(entry.getKey());
743 for (StoreFile storeFile : entry.getValue()) {
744 fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
745 this.mergesdir);
746 }
747 }
748
749 HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
750 for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
751 .entrySet()) {
752 String familyName = Bytes.toString(entry.getKey());
753 for (StoreFile storeFile : entry.getValue()) {
754 fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
755 this.mergesdir);
756 }
757 }
758 }
759
760
761
762
763
764
765
766
767
768 @Override
769 @SuppressWarnings("deprecation")
770 public boolean rollback(final Server server,
771 final RegionServerServices services) throws IOException {
772 if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
773 LOG.warn("Should use execute(Server, RegionServerServices, User)");
774 }
775 return rollback(server, services, null);
776 }
777
778 @Override
779 public boolean rollback(final Server server,
780 final RegionServerServices services, User user) throws IOException {
781 assert this.mergedRegionInfo != null;
782
783 if (rsCoprocessorHost != null) {
784 if (user == null) {
785 rsCoprocessorHost.preRollBackMerge(region_a, region_b);
786 } else {
787 try {
788 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
789 @Override
790 public Void run() throws Exception {
791 rsCoprocessorHost.preRollBackMerge(region_a, region_b);
792 return null;
793 }
794 });
795 } catch (InterruptedException ie) {
796 InterruptedIOException iioe = new InterruptedIOException();
797 iioe.initCause(ie);
798 throw iioe;
799 }
800 }
801 }
802
803 boolean result = true;
804 ListIterator<JournalEntry> iterator = this.journal
805 .listIterator(this.journal.size());
806
807 while (iterator.hasPrevious()) {
808 JournalEntry je = iterator.previous();
809
810 transition(je.getPhase(), true);
811
812 switch (je.getPhase()) {
813
814 case SET_MERGING:
815 if (useCoordination(server)) {
816 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
817 .getRegionMergeCoordination().clean(this.mergedRegionInfo);
818 } else if (services != null && !useCoordinationForAssignment
819 && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED,
820 mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
821 return false;
822 }
823 break;
824
825 case CREATED_MERGE_DIR:
826 this.region_a.writestate.writesEnabled = true;
827 this.region_b.writestate.writesEnabled = true;
828 this.region_a.getRegionFileSystem().cleanupMergesDir();
829 break;
830
831 case CLOSED_REGION_A:
832 try {
833
834
835
836
837
838 this.region_a.initialize();
839 } catch (IOException e) {
840 LOG.error("Failed rollbacking CLOSED_REGION_A of region "
841 + this.region_a.getRegionInfo().getRegionNameAsString(), e);
842 throw new RuntimeException(e);
843 }
844 break;
845
846 case OFFLINED_REGION_A:
847 if (services != null)
848 services.addToOnlineRegions(this.region_a);
849 break;
850
851 case CLOSED_REGION_B:
852 try {
853 this.region_b.initialize();
854 } catch (IOException e) {
855 LOG.error("Failed rollbacking CLOSED_REGION_A of region "
856 + this.region_b.getRegionInfo().getRegionNameAsString(), e);
857 throw new RuntimeException(e);
858 }
859 break;
860
861 case OFFLINED_REGION_B:
862 if (services != null)
863 services.addToOnlineRegions(this.region_b);
864 break;
865
866 case STARTED_MERGED_REGION_CREATION:
867 this.region_a.getRegionFileSystem().cleanupMergedRegion(
868 this.mergedRegionInfo);
869 break;
870
871 case PONR:
872
873
874 return false;
875
876
877 case STARTED:
878 case PREPARED:
879 case COMPLETED:
880 break;
881
882 default:
883 throw new RuntimeException("Unhandled journal entry: " + je);
884 }
885 }
886
887 if (rsCoprocessorHost != null) {
888 if (user == null) {
889 rsCoprocessorHost.postRollBackMerge(region_a, region_b);
890 } else {
891 try {
892 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
893 @Override
894 public Void run() throws Exception {
895 rsCoprocessorHost.postRollBackMerge(region_a, region_b);
896 return null;
897 }
898 });
899 } catch (InterruptedException ie) {
900 InterruptedIOException iioe = new InterruptedIOException();
901 iioe.initCause(ie);
902 throw iioe;
903 }
904 }
905 }
906
907 return result;
908 }
909
910 @Override
911 public HRegionInfo getMergedRegionInfo() {
912 return this.mergedRegionInfo;
913 }
914
915
916 Path getMergesDir() {
917 return this.mergesdir;
918 }
919
920 private boolean useCoordination(final Server server) {
921 return server != null && useCoordinationForAssignment
922 && server.getCoordinatedStateManager() != null;
923 }
924
925
926
927
928
929
930
931
932
933
934
935 boolean hasMergeQualifierInMeta(final RegionServerServices services,
936 final byte[] regionName) throws IOException {
937 if (services == null) return false;
938
939
940 Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaTableAccessor
941 .getRegionsFromMergeQualifier(services.getConnection(), regionName);
942 if (mergeRegions != null &&
943 (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
944
945 return true;
946 }
947 return false;
948 }
949
950 @Override
951 public List<JournalEntry> getJournal() {
952 return journal;
953 }
954
955 @Override
956 public RegionMergeTransaction registerTransactionListener(TransactionListener listener) {
957 listeners.add(listener);
958 return this;
959 }
960
961 @Override
962 public Server getServer() {
963 return server;
964 }
965
966 @Override
967 public RegionServerServices getRegionServerServices() {
968 return rsServices;
969 }
970
971 }