1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.security.PrivilegedExceptionAction;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.ListIterator;
27 import java.util.Map;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HRegionInfo;
42 import org.apache.hadoop.hbase.Server;
43 import org.apache.hadoop.hbase.ServerName;
44 import org.apache.hadoop.hbase.MetaTableAccessor;
45 import org.apache.hadoop.hbase.client.HConnection;
46 import org.apache.hadoop.hbase.client.Mutation;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
49 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
50 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
51 import org.apache.hadoop.hbase.security.User;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.CancelableProgressable;
54 import org.apache.hadoop.hbase.util.ConfigUtil;
55 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56 import org.apache.hadoop.hbase.util.FSUtils;
57 import org.apache.hadoop.hbase.util.HasThread;
58 import org.apache.hadoop.hbase.util.Pair;
59 import org.apache.hadoop.hbase.util.PairOfSameType;
60 import org.apache.zookeeper.KeeperException;
61
62 import com.google.common.util.concurrent.ThreadFactoryBuilder;
63
64 @InterfaceAudience.Private
65 public class SplitTransactionImpl implements SplitTransaction {
66 private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
67
68
69
70
71 private final HRegion parent;
72 private HRegionInfo hri_a;
73 private HRegionInfo hri_b;
74 private long fileSplitTimeout = 30000;
75 public SplitTransactionCoordination.SplitTransactionDetails std;
76 boolean useZKForAssignment;
77
78
79
80
81 private final byte [] splitrow;
82
83
84
85
86
87 private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
88 private Server server;
89 private RegionServerServices rsServices;
90
91 public static class JournalEntryImpl implements JournalEntry {
92 private SplitTransactionPhase type;
93 private long timestamp;
94
95 public JournalEntryImpl(SplitTransactionPhase type) {
96 this(type, EnvironmentEdgeManager.currentTime());
97 }
98
99 public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
100 this.type = type;
101 this.timestamp = timestamp;
102 }
103
104 @Override
105 public String toString() {
106 StringBuilder sb = new StringBuilder();
107 sb.append(type);
108 sb.append(" at ");
109 sb.append(timestamp);
110 return sb.toString();
111 }
112
113 @Override
114 public SplitTransactionPhase getPhase() {
115 return type;
116 }
117
118 @Override
119 public long getTimeStamp() {
120 return timestamp;
121 }
122 }
123
124
125
126
127 private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
128
129
130
131
132 private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
133
134
135
136
137
138
139 public SplitTransactionImpl(final Region r, final byte [] splitrow) {
140 this.parent = (HRegion)r;
141 this.splitrow = splitrow;
142 this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
143 useZKForAssignment = ConfigUtil.useZKForAssignment(parent.getBaseConf());
144 }
145
146 private void transition(SplitTransactionPhase nextPhase) throws IOException {
147 transition(nextPhase, false);
148 }
149
150 private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
151 throws IOException {
152 if (!isRollback) {
153
154
155 this.journal.add(new JournalEntryImpl(nextPhase));
156 }
157 for (int i = 0; i < listeners.size(); i++) {
158 TransactionListener listener = listeners.get(i);
159 if (!isRollback) {
160 listener.transition(this, currentPhase, nextPhase);
161 } else {
162 listener.rollback(this, currentPhase, nextPhase);
163 }
164 }
165 currentPhase = nextPhase;
166 }
167
168
169
170
171
172
173 public boolean prepare() throws IOException {
174 if (!this.parent.isSplittable()) return false;
175
176 if (this.splitrow == null) return false;
177 HRegionInfo hri = this.parent.getRegionInfo();
178 parent.prepareToSplit();
179
180 byte [] startKey = hri.getStartKey();
181 byte [] endKey = hri.getEndKey();
182 if (Bytes.equals(startKey, splitrow) ||
183 !this.parent.getRegionInfo().containsRow(splitrow)) {
184 LOG.info("Split row is not inside region key range or is equal to " +
185 "startkey: " + Bytes.toStringBinary(this.splitrow));
186 return false;
187 }
188 long rid = getDaughterRegionIdTimestamp(hri);
189 this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
190 this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
191
192 transition(SplitTransactionPhase.PREPARED);
193
194 return true;
195 }
196
197
198
199
200
201
202 private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
203 long rid = EnvironmentEdgeManager.currentTime();
204
205
206 if (rid < hri.getRegionId()) {
207 LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
208 " but current time here is " + rid);
209 rid = hri.getRegionId() + 1;
210 }
211 return rid;
212 }
213
214 private static IOException closedByOtherException = new IOException(
215 "Failed to close region: already closed by another thread");
216
217
218 final RegionServerServices services) throws IOException {
219 return createDaughters(server, services, null);
220 }
221
222
223
224
225
226
227
228
229
230
231
232
233 final RegionServerServices services, User user) throws IOException {
234 LOG.info("Starting split of region " + this.parent);
235 if ((server != null && server.isStopped()) ||
236 (services != null && services.isStopping())) {
237 throw new IOException("Server is stopped or stopping");
238 }
239 assert !this.parent.lock.writeLock().isHeldByCurrentThread():
240 "Unsafe to hold write lock while performing RPCs";
241
242 transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
243
244
245 if (this.parent.getCoprocessorHost() != null) {
246 if (user == null) {
247
248 parent.getCoprocessorHost().preSplit();
249 parent.getCoprocessorHost().preSplit(splitrow);
250 } else {
251 try {
252 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
253 @Override
254 public Void run() throws Exception {
255 parent.getCoprocessorHost().preSplit();
256 parent.getCoprocessorHost().preSplit(splitrow);
257 return null;
258 }
259 });
260 } catch (InterruptedException ie) {
261 InterruptedIOException iioe = new InterruptedIOException();
262 iioe.initCause(ie);
263 throw iioe;
264 }
265 }
266 }
267
268 transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
269
270
271 boolean testing = server == null? true:
272 server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
273 this.fileSplitTimeout = testing ? this.fileSplitTimeout :
274 server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
275 this.fileSplitTimeout);
276
277 PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
278
279 final List<Mutation> metaEntries = new ArrayList<Mutation>();
280 boolean ret = false;
281 if (this.parent.getCoprocessorHost() != null) {
282 if (user == null) {
283 ret = parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
284 } else {
285 try {
286 ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
287 @Override
288 public Boolean run() throws Exception {
289 return parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
290 }
291 });
292 } catch (InterruptedException ie) {
293 InterruptedIOException iioe = new InterruptedIOException();
294 iioe.initCause(ie);
295 throw iioe;
296 }
297 }
298 if (ret) {
299 throw new IOException("Coprocessor bypassing region "
300 + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
301 }
302 try {
303 for (Mutation p : metaEntries) {
304 HRegionInfo.parseRegionName(p.getRow());
305 }
306 } catch (IOException e) {
307 LOG.error("Row key of mutation from coprossor is not parsable as region name."
308 + "Mutations from coprocessor should only for hbase:meta table.");
309 throw e;
310 }
311 }
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 transition(SplitTransactionPhase.PONR);
329
330
331
332
333
334
335 if (!testing && useZKForAssignment) {
336 if (metaEntries == null || metaEntries.isEmpty()) {
337 MetaTableAccessor.splitRegion(server.getConnection(),
338 parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
339 daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
340 parent.getTableDesc().getRegionReplication());
341 } else {
342 offlineParentInMetaAndputMetaEntries(server.getConnection(),
343 parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
344 .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
345 parent.getTableDesc().getRegionReplication());
346 }
347 } else if (services != null && !useZKForAssignment) {
348 if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
349 parent.getRegionInfo(), hri_a, hri_b)) {
350
351 throw new IOException("Failed to notify master that split passed PONR: "
352 + parent.getRegionInfo().getRegionNameAsString());
353 }
354 }
355 return daughterRegions;
356 }
357
358 public PairOfSameType<Region> stepsBeforePONR(final Server server,
359 final RegionServerServices services, boolean testing) throws IOException {
360
361 if (useCoordinatedStateManager(server)) {
362 if (std == null) {
363 std =
364 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
365 .getSplitTransactionCoordination().getDefaultDetails();
366 }
367 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
368 .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
369 hri_a, hri_b);
370 } else if (services != null && !useZKForAssignment) {
371 if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
372 parent.getRegionInfo(), hri_a, hri_b)) {
373 throw new IOException("Failed to get ok from master to split "
374 + parent.getRegionInfo().getRegionNameAsString());
375 }
376 }
377
378 transition(SplitTransactionPhase.SET_SPLITTING);
379
380 if (useCoordinatedStateManager(server)) {
381 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
382 .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
383 hri_b, std);
384 }
385
386 this.parent.getRegionFileSystem().createSplitsDir();
387
388 transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
389
390 Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
391 Exception exceptionToThrow = null;
392 try{
393 hstoreFilesToSplit = this.parent.close(false);
394 } catch (Exception e) {
395 exceptionToThrow = e;
396 }
397 if (exceptionToThrow == null && hstoreFilesToSplit == null) {
398
399
400
401
402
403 exceptionToThrow = closedByOtherException;
404 }
405 if (exceptionToThrow != closedByOtherException) {
406 transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
407 }
408 if (exceptionToThrow != null) {
409 if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
410 throw new IOException(exceptionToThrow);
411 }
412 if (!testing) {
413 services.removeFromOnlineRegions(this.parent, null);
414 }
415
416 transition(SplitTransactionPhase.OFFLINED_PARENT);
417
418
419
420
421
422
423
424 Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
425
426
427
428
429
430 transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
431
432 assertReferenceFileCount(expectedReferences.getFirst(),
433 this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
434 Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
435 assertReferenceFileCount(expectedReferences.getFirst(),
436 new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
437
438
439 transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
440
441 assertReferenceFileCount(expectedReferences.getSecond(),
442 this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
443 Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
444 assertReferenceFileCount(expectedReferences.getSecond(),
445 new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
446
447 return new PairOfSameType<Region>(a, b);
448 }
449
450 void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
451 throws IOException {
452 if (expectedReferenceFileCount != 0 &&
453 expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
454 dir)) {
455 throw new IOException("Failing split. Expected reference file count isn't equal.");
456 }
457 }
458
459
460
461
462
463
464
465
466
467
468
469 final RegionServerServices services, Region a, Region b)
470 throws IOException {
471 boolean stopped = server != null && server.isStopped();
472 boolean stopping = services != null && services.isStopping();
473
474 if (stopped || stopping) {
475 LOG.info("Not opening daughters " +
476 b.getRegionInfo().getRegionNameAsString() +
477 " and " +
478 a.getRegionInfo().getRegionNameAsString() +
479 " because stopping=" + stopping + ", stopped=" + stopped);
480 } else {
481
482 DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
483 DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
484 aOpener.start();
485 bOpener.start();
486 try {
487 aOpener.join();
488 if (aOpener.getException() == null) {
489 transition(SplitTransactionPhase.OPENED_REGION_A);
490 }
491 bOpener.join();
492 if (bOpener.getException() == null) {
493 transition(SplitTransactionPhase.OPENED_REGION_B);
494 }
495 } catch (InterruptedException e) {
496 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
497 }
498 if (aOpener.getException() != null) {
499 throw new IOException("Failed " +
500 aOpener.getName(), aOpener.getException());
501 }
502 if (bOpener.getException() != null) {
503 throw new IOException("Failed " +
504 bOpener.getName(), bOpener.getException());
505 }
506 if (services != null) {
507 try {
508 if (useZKForAssignment) {
509
510 services.postOpenDeployTasks(b);
511 } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
512 parent.getRegionInfo(), hri_a, hri_b)) {
513 throw new IOException("Failed to report split region to master: "
514 + parent.getRegionInfo().getShortNameToLog());
515 }
516
517 services.addToOnlineRegions(b);
518 if (useZKForAssignment) {
519 services.postOpenDeployTasks(a);
520 }
521 services.addToOnlineRegions(a);
522 } catch (KeeperException ke) {
523 throw new IOException(ke);
524 }
525 }
526 }
527 }
528
529 public PairOfSameType<Region> execute(final Server server,
530 final RegionServerServices services)
531 throws IOException {
532 if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
533 LOG.warn("Should use execute(Server, RegionServerServices, User)");
534 }
535 return execute(server, services, null);
536 }
537
538
539
540
541
542
543
544
545
546
547
548 @Override
549 public PairOfSameType<Region> execute(final Server server,
550 final RegionServerServices services, User user) throws IOException {
551 this.server = server;
552 this.rsServices = services;
553 useZKForAssignment = server == null ? true :
554 ConfigUtil.useZKForAssignment(server.getConfiguration());
555 if (useCoordinatedStateManager(server)) {
556 std =
557 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
558 .getSplitTransactionCoordination().getDefaultDetails();
559 }
560 PairOfSameType<Region> regions = createDaughters(server, services, user);
561 if (this.parent.getCoprocessorHost() != null) {
562 if (user == null) {
563 parent.getCoprocessorHost().preSplitAfterPONR();
564 } else {
565 try {
566 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
567 @Override
568 public Void run() throws Exception {
569 parent.getCoprocessorHost().preSplitAfterPONR();
570 return null;
571 }
572 });
573 } catch (InterruptedException ie) {
574 InterruptedIOException iioe = new InterruptedIOException();
575 iioe.initCause(ie);
576 throw iioe;
577 }
578 }
579 }
580 regions = stepsAfterPONR(server, services, regions, user);
581
582 transition(SplitTransactionPhase.COMPLETED);
583
584 return regions;
585 }
586
587 @Deprecated
588 public PairOfSameType<Region> stepsAfterPONR(final Server server,
589 final RegionServerServices services, final PairOfSameType<Region> regions)
590 throws IOException {
591 return stepsAfterPONR(server, services, regions, null);
592 }
593
594 public PairOfSameType<Region> stepsAfterPONR(final Server server,
595 final RegionServerServices services, final PairOfSameType<Region> regions, User user)
596 throws IOException {
597 openDaughters(server, services, regions.getFirst(), regions.getSecond());
598 if (useCoordinatedStateManager(server)) {
599 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
600 .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
601 regions.getSecond(), std, parent);
602 }
603
604 transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
605
606
607 if (parent.getCoprocessorHost() != null) {
608 if (user == null) {
609 this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
610 } else {
611 try {
612 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
613 @Override
614 public Void run() throws Exception {
615 parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
616 return null;
617 }
618 });
619 } catch (InterruptedException ie) {
620 InterruptedIOException iioe = new InterruptedIOException();
621 iioe.initCause(ie);
622 throw iioe;
623 }
624 }
625 }
626
627 transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
628
629 return regions;
630 }
631
632 private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
633 HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
634 ServerName serverName, List<Mutation> metaEntries, int regionReplication)
635 throws IOException {
636 List<Mutation> mutations = metaEntries;
637 HRegionInfo copyOfParent = new HRegionInfo(parent);
638 copyOfParent.setOffline(true);
639 copyOfParent.setSplit(true);
640
641
642 Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
643 MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
644 mutations.add(putParent);
645
646
647 Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
648 Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
649
650 addLocation(putA, serverName, 1);
651 addLocation(putB, serverName, 1);
652 mutations.add(putA);
653 mutations.add(putB);
654
655
656
657 for (int i = 1; i < regionReplication; i++) {
658 addEmptyLocation(putA, i);
659 addEmptyLocation(putB, i);
660 }
661
662 MetaTableAccessor.mutateMetaTable(hConnection, mutations);
663 }
664
665 private static Put addEmptyLocation(final Put p, int replicaId){
666 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
667 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId),
668 null);
669 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
670 return p;
671 }
672
673 public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
674 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
675 Bytes.toBytes(sn.getHostAndPort()));
676 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
677 Bytes.toBytes(sn.getStartcode()));
678 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
679 Bytes.toBytes(openSeqNum));
680 return p;
681 }
682
683
684
685
686
687 class DaughterOpener extends HasThread {
688 private final Server server;
689 private final HRegion r;
690 private Throwable t = null;
691
692 DaughterOpener(final Server s, final HRegion r) {
693 super((s == null? "null-services": s.getServerName()) +
694 "-daughterOpener=" + r.getRegionInfo().getEncodedName());
695 setDaemon(true);
696 this.server = s;
697 this.r = r;
698 }
699
700
701
702
703
704 Throwable getException() {
705 return this.t;
706 }
707
708 @Override
709 public void run() {
710 try {
711 openDaughterRegion(this.server, r);
712 } catch (Throwable t) {
713 this.t = t;
714 }
715 }
716 }
717
718
719
720
721
722
723
724
725 void openDaughterRegion(final Server server, final HRegion daughter)
726 throws IOException, KeeperException {
727 HRegionInfo hri = daughter.getRegionInfo();
728 LoggingProgressable reporter = server == null ? null
729 : new LoggingProgressable(hri, server.getConfiguration().getLong(
730 "hbase.regionserver.split.daughter.open.log.interval", 10000));
731 daughter.openHRegion(reporter);
732 }
733
734 static class LoggingProgressable implements CancelableProgressable {
735 private final HRegionInfo hri;
736 private long lastLog = -1;
737 private final long interval;
738
739 LoggingProgressable(final HRegionInfo hri, final long interval) {
740 this.hri = hri;
741 this.interval = interval;
742 }
743
744 @Override
745 public boolean progress() {
746 long now = EnvironmentEdgeManager.currentTime();
747 if (now - lastLog > this.interval) {
748 LOG.info("Opening " + this.hri.getRegionNameAsString());
749 this.lastLog = now;
750 }
751 return true;
752 }
753 }
754
755 private boolean useCoordinatedStateManager(final Server server) {
756 return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
757 }
758
759
760
761
762
763
764
765 private Pair<Integer, Integer> splitStoreFiles(
766 final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
767 throws IOException {
768 if (hstoreFilesToSplit == null) {
769
770 throw new IOException("Close returned empty list of StoreFiles");
771 }
772
773
774
775 int nbFiles = 0;
776 for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
777 nbFiles += entry.getValue().size();
778 }
779 if (nbFiles == 0) {
780
781 return new Pair<Integer, Integer>(0,0);
782 }
783
784
785 int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
786 HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
787 Runtime.getRuntime().availableProcessors());
788
789 int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
790 defMaxThreads), nbFiles);
791 LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
792 " using " + maxThreads + " threads");
793 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
794 builder.setNameFormat("StoreFileSplitter-%1$d");
795 ThreadFactory factory = builder.build();
796 ThreadPoolExecutor threadPool =
797 (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
798 List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
799
800
801 for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
802 for (StoreFile sf: entry.getValue()) {
803 StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
804 futures.add(threadPool.submit(sfs));
805 }
806 }
807
808 threadPool.shutdown();
809
810
811 try {
812 boolean stillRunning = !threadPool.awaitTermination(
813 this.fileSplitTimeout, TimeUnit.MILLISECONDS);
814 if (stillRunning) {
815 threadPool.shutdownNow();
816
817 while (!threadPool.isTerminated()) {
818 Thread.sleep(50);
819 }
820 throw new IOException("Took too long to split the" +
821 " files and create the references, aborting split");
822 }
823 } catch (InterruptedException e) {
824 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
825 }
826
827 int created_a = 0;
828 int created_b = 0;
829
830 for (Future<Pair<Path, Path>> future : futures) {
831 try {
832 Pair<Path, Path> p = future.get();
833 created_a += p.getFirst() != null ? 1 : 0;
834 created_b += p.getSecond() != null ? 1 : 0;
835 } catch (InterruptedException e) {
836 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
837 } catch (ExecutionException e) {
838 throw new IOException(e);
839 }
840 }
841
842 if (LOG.isDebugEnabled()) {
843 LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a
844 + " storefiles, Daughter B: " + created_b + " storefiles.");
845 }
846 return new Pair<Integer, Integer>(created_a, created_b);
847 }
848
849 private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
850 throws IOException {
851 if (LOG.isDebugEnabled()) {
852 LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
853 this.parent);
854 }
855 HRegionFileSystem fs = this.parent.getRegionFileSystem();
856 String familyName = Bytes.toString(family);
857
858 Path path_a =
859 fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
860 this.parent.getSplitPolicy());
861 Path path_b =
862 fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
863 this.parent.getSplitPolicy());
864 if (LOG.isDebugEnabled()) {
865 LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
866 this.parent);
867 }
868 return new Pair<Path,Path>(path_a, path_b);
869 }
870
871
872
873
874
875 class StoreFileSplitter implements Callable<Pair<Path,Path>> {
876 private final byte[] family;
877 private final StoreFile sf;
878
879
880
881
882
883
884 public StoreFileSplitter(final byte[] family, final StoreFile sf) {
885 this.sf = sf;
886 this.family = family;
887 }
888
889 public Pair<Path,Path> call() throws IOException {
890 return splitStoreFile(family, sf);
891 }
892 }
893
894 @Override
895 public boolean rollback(final Server server, final RegionServerServices services)
896 throws IOException {
897 if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
898 LOG.warn("Should use rollback(Server, RegionServerServices, User)");
899 }
900 return rollback(server, services, null);
901 }
902
903
904
905
906
907
908
909
910 @Override
911 @SuppressWarnings("deprecation")
912 public boolean rollback(final Server server, final RegionServerServices services, User user)
913 throws IOException {
914
915 if (this.parent.getCoprocessorHost() != null) {
916 if (user == null) {
917 this.parent.getCoprocessorHost().preRollBackSplit();
918 } else {
919 try {
920 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
921 @Override
922 public Void run() throws Exception {
923 parent.getCoprocessorHost().preRollBackSplit();
924 return null;
925 }
926 });
927 } catch (InterruptedException ie) {
928 InterruptedIOException iioe = new InterruptedIOException();
929 iioe.initCause(ie);
930 throw iioe;
931 }
932 }
933 }
934
935 boolean result = true;
936 ListIterator<JournalEntry> iterator =
937 this.journal.listIterator(this.journal.size());
938
939 while (iterator.hasPrevious()) {
940 JournalEntry je = iterator.previous();
941
942 transition(je.getPhase(), true);
943
944 switch(je.getPhase()) {
945
946 case SET_SPLITTING:
947 if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
948 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
949 .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
950 } else if (services != null && !useZKForAssignment
951 && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
952 parent.getRegionInfo(), hri_a, hri_b)) {
953 return false;
954 }
955 break;
956
957 case CREATE_SPLIT_DIR:
958 this.parent.writestate.writesEnabled = true;
959 this.parent.getRegionFileSystem().cleanupSplitsDir();
960 break;
961
962 case CLOSED_PARENT_REGION:
963 try {
964
965
966
967
968
969 this.parent.initialize();
970 } catch (IOException e) {
971 LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
972 this.parent.getRegionInfo().getRegionNameAsString(), e);
973 throw new RuntimeException(e);
974 }
975 break;
976
977 case STARTED_REGION_A_CREATION:
978 this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
979 break;
980
981 case STARTED_REGION_B_CREATION:
982 this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
983 break;
984
985 case OFFLINED_PARENT:
986 if (services != null) services.addToOnlineRegions(this.parent);
987 break;
988
989 case PONR:
990
991
992
993
994 return false;
995
996
997 case STARTED:
998 case PREPARED:
999 case BEFORE_PRE_SPLIT_HOOK:
1000 case AFTER_PRE_SPLIT_HOOK:
1001 case BEFORE_POST_SPLIT_HOOK:
1002 case AFTER_POST_SPLIT_HOOK:
1003 case OPENED_REGION_A:
1004 case OPENED_REGION_B:
1005 case COMPLETED:
1006 break;
1007
1008 default:
1009 throw new RuntimeException("Unhandled journal entry: " + je);
1010 }
1011 }
1012
1013 if (this.parent.getCoprocessorHost() != null) {
1014 if (user == null) {
1015 this.parent.getCoprocessorHost().postRollBackSplit();
1016 } else {
1017 try {
1018 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1019 @Override
1020 public Void run() throws Exception {
1021 parent.getCoprocessorHost().postRollBackSplit();
1022 return null;
1023 }
1024 });
1025 } catch (InterruptedException ie) {
1026 InterruptedIOException iioe = new InterruptedIOException();
1027 iioe.initCause(ie);
1028 throw iioe;
1029 }
1030 }
1031 }
1032 return result;
1033 }
1034
1035 HRegionInfo getFirstDaughter() {
1036 return hri_a;
1037 }
1038
1039 HRegionInfo getSecondDaughter() {
1040 return hri_b;
1041 }
1042
1043 @Override
1044 public List<JournalEntry> getJournal() {
1045 return journal;
1046 }
1047
1048 @Override
1049 public SplitTransaction registerTransactionListener(TransactionListener listener) {
1050 listeners.add(listener);
1051 return this;
1052 }
1053
1054 @Override
1055 public Server getServer() {
1056 return server;
1057 }
1058
1059 @Override
1060 public RegionServerServices getRegionServerServices() {
1061 return rsServices;
1062 }
1063 }