1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.math.BigInteger;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.Comparator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Set;
30 import java.util.TreeMap;
31
32 import org.apache.commons.cli.CommandLine;
33 import org.apache.commons.cli.GnuParser;
34 import org.apache.commons.cli.HelpFormatter;
35 import org.apache.commons.cli.OptionBuilder;
36 import org.apache.commons.cli.Options;
37 import org.apache.commons.cli.ParseException;
38 import org.apache.commons.lang.ArrayUtils;
39 import org.apache.commons.lang.StringUtils;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataInputStream;
44 import org.apache.hadoop.fs.FSDataOutputStream;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.hbase.ClusterStatus;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HColumnDescriptor;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.HRegionLocation;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.MetaTableAccessor;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.classification.InterfaceAudience;
57 import org.apache.hadoop.hbase.client.Admin;
58 import org.apache.hadoop.hbase.client.ClusterConnection;
59 import org.apache.hadoop.hbase.client.Connection;
60 import org.apache.hadoop.hbase.client.ConnectionFactory;
61 import org.apache.hadoop.hbase.client.NoServerForRegionException;
62 import org.apache.hadoop.hbase.client.RegionLocator;
63 import org.apache.hadoop.hbase.client.Table;
64 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
65
66 import com.google.common.base.Preconditions;
67 import com.google.common.collect.Lists;
68 import com.google.common.collect.Maps;
69 import com.google.common.collect.Sets;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 @InterfaceAudience.Private
143 public class RegionSplitter {
144 private static final Log LOG = LogFactory.getLog(RegionSplitter.class);
145
146
147
148
149
150
151
152
153
154
155 public interface SplitAlgorithm {
156
157
158
159
160
161
162
163
164
165 byte[] split(byte[] start, byte[] end);
166
167
168
169
170
171
172
173
174
175
176
177
178
179 byte[][] split(int numRegions);
180
181
182
183
184
185
186
187
188 byte[] firstRow();
189
190
191
192
193
194
195
196
197 byte[] lastRow();
198
199
200
201
202
203
204
205
206
207 void setFirstRow(String userInput);
208
209
210
211
212
213
214
215
216
217
218 void setLastRow(String userInput);
219
220
221
222
223
224
225 byte[] strToRow(String input);
226
227
228
229
230
231
232 String rowToStr(byte[] row);
233
234
235
236
237 String separator();
238
239
240
241
242
243 void setFirstRow(byte[] userInput);
244
245
246
247
248
249 void setLastRow(byte[] userInput);
250 }
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 @SuppressWarnings("static-access")
288 public static void main(String[] args) throws IOException,
289 InterruptedException, ParseException {
290 Configuration conf = HBaseConfiguration.create();
291
292
293 Options opt = new Options();
294 opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
295 .withDescription("Override HBase Configuration Settings").create("D"));
296 opt.addOption(OptionBuilder.withArgName("region count").hasArg()
297 .withDescription(
298 "Create a new table with a pre-split number of regions")
299 .create("c"));
300 opt.addOption(OptionBuilder.withArgName("family:family:...").hasArg()
301 .withDescription(
302 "Column Families to create with new table. Required with -c")
303 .create("f"));
304 opt.addOption("h", false, "Print this usage help");
305 opt.addOption("r", false, "Perform a rolling split of an existing region");
306 opt.addOption(OptionBuilder.withArgName("count").hasArg().withDescription(
307 "Max outstanding splits that have unfinished major compactions")
308 .create("o"));
309 opt.addOption(null, "firstrow", true,
310 "First Row in Table for Split Algorithm");
311 opt.addOption(null, "lastrow", true,
312 "Last Row in Table for Split Algorithm");
313 opt.addOption(null, "risky", false,
314 "Skip verification steps to complete quickly."
315 + "STRONGLY DISCOURAGED for production systems. ");
316 CommandLine cmd = new GnuParser().parse(opt, args);
317
318 if (cmd.hasOption("D")) {
319 for (String confOpt : cmd.getOptionValues("D")) {
320 String[] kv = confOpt.split("=", 2);
321 if (kv.length == 2) {
322 conf.set(kv[0], kv[1]);
323 LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
324 } else {
325 throw new ParseException("-D option format invalid: " + confOpt);
326 }
327 }
328 }
329
330 if (cmd.hasOption("risky")) {
331 conf.setBoolean("split.verify", false);
332 }
333
334 boolean createTable = cmd.hasOption("c") && cmd.hasOption("f");
335 boolean rollingSplit = cmd.hasOption("r");
336 boolean oneOperOnly = createTable ^ rollingSplit;
337
338 if (2 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) {
339 new HelpFormatter().printHelp("RegionSplitter <TABLE> <SPLITALGORITHM>\n"+
340 "SPLITALGORITHM is a java class name of a class implementing " +
341 "SplitAlgorithm, or one of the special strings HexStringSplit " +
342 "or UniformSplit, which are built-in split algorithms. " +
343 "HexStringSplit treats keys as hexadecimal ASCII, and " +
344 "UniformSplit treats keys as arbitrary bytes.", opt);
345 return;
346 }
347 TableName tableName = TableName.valueOf(cmd.getArgs()[0]);
348 String splitClass = cmd.getArgs()[1];
349 SplitAlgorithm splitAlgo = newSplitAlgoInstance(conf, splitClass);
350
351 if (cmd.hasOption("firstrow")) {
352 splitAlgo.setFirstRow(cmd.getOptionValue("firstrow"));
353 }
354 if (cmd.hasOption("lastrow")) {
355 splitAlgo.setLastRow(cmd.getOptionValue("lastrow"));
356 }
357
358 if (createTable) {
359 conf.set("split.count", cmd.getOptionValue("c"));
360 createPresplitTable(tableName, splitAlgo, cmd.getOptionValue("f").split(":"), conf);
361 }
362
363 if (rollingSplit) {
364 if (cmd.hasOption("o")) {
365 conf.set("split.outstanding", cmd.getOptionValue("o"));
366 }
367 rollingSplit(tableName, splitAlgo, conf);
368 }
369 }
370
371 static void createPresplitTable(TableName tableName, SplitAlgorithm splitAlgo,
372 String[] columnFamilies, Configuration conf)
373 throws IOException, InterruptedException {
374 final int splitCount = conf.getInt("split.count", 0);
375 Preconditions.checkArgument(splitCount > 1, "Split count must be > 1");
376
377 Preconditions.checkArgument(columnFamilies.length > 0,
378 "Must specify at least one column family. ");
379 LOG.debug("Creating table " + tableName + " with " + columnFamilies.length
380 + " column families. Presplitting to " + splitCount + " regions");
381
382 HTableDescriptor desc = new HTableDescriptor(tableName);
383 for (String cf : columnFamilies) {
384 desc.addFamily(new HColumnDescriptor(Bytes.toBytes(cf)));
385 }
386 try (Connection connection = ConnectionFactory.createConnection(conf)) {
387 Admin admin = connection.getAdmin();
388 try {
389 Preconditions.checkArgument(!admin.tableExists(tableName),
390 "Table already exists: " + tableName);
391 admin.createTable(desc, splitAlgo.split(splitCount));
392 } finally {
393 admin.close();
394 }
395 LOG.debug("Table created! Waiting for regions to show online in META...");
396 if (!conf.getBoolean("split.verify", true)) {
397
398 int onlineRegions = 0;
399 while (onlineRegions < splitCount) {
400 onlineRegions = MetaTableAccessor.getRegionCount(connection, tableName);
401 LOG.debug(onlineRegions + " of " + splitCount + " regions online...");
402 if (onlineRegions < splitCount) {
403 Thread.sleep(10 * 1000);
404 }
405 }
406 }
407 LOG.debug("Finished creating table with " + splitCount + " regions");
408 }
409 }
410
411
412
413
414
415
416
417 private static int getRegionServerCount(final Connection connection) throws IOException {
418 try (Admin admin = connection.getAdmin()) {
419 ClusterStatus status = admin.getClusterStatus();
420 Collection<ServerName> servers = status.getServers();
421 return servers == null || servers.isEmpty()? 0: servers.size();
422 }
423 }
424
425 private static byte [] readFile(final FileSystem fs, final Path path) throws IOException {
426 FSDataInputStream tmpIn = fs.open(path);
427 try {
428 byte [] rawData = new byte[tmpIn.available()];
429 tmpIn.readFully(rawData);
430 return rawData;
431 } finally {
432 tmpIn.close();
433 }
434 }
435
436 static void rollingSplit(TableName tableName, SplitAlgorithm splitAlgo, Configuration conf)
437 throws IOException, InterruptedException {
438 final int minOS = conf.getInt("split.outstanding", 2);
439 try (Connection connection = ConnectionFactory.createConnection(conf)) {
440
441 final int MAX_OUTSTANDING = Math.max(getRegionServerCount(connection) / 2, minOS);
442
443 Path hbDir = FSUtils.getRootDir(conf);
444 Path tableDir = FSUtils.getTableDir(hbDir, tableName);
445 Path splitFile = new Path(tableDir, "_balancedSplit");
446 FileSystem fs = FileSystem.get(conf);
447
448
449 LinkedList<Pair<byte[], byte[]>> tmpRegionSet = null;
450 try (Table table = connection.getTable(tableName)) {
451 tmpRegionSet = getSplits(connection, tableName, splitAlgo);
452 }
453 LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
454 int splitCount = 0;
455 final int origCount = tmpRegionSet.size();
456
457
458
459
460 LOG.debug("Bucketing regions by regionserver...");
461 TreeMap<String, LinkedList<Pair<byte[], byte[]>>> daughterRegions =
462 Maps.newTreeMap();
463
464 try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
465 for (Pair<byte[], byte[]> dr : tmpRegionSet) {
466 String rsLocation = regionLocator.getRegionLocation(dr.getSecond()).getHostnamePort();
467 if (!daughterRegions.containsKey(rsLocation)) {
468 LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
469 daughterRegions.put(rsLocation, entry);
470 }
471 daughterRegions.get(rsLocation).add(dr);
472 }
473 LOG.debug("Done with bucketing. Split time!");
474 long startTime = System.currentTimeMillis();
475
476
477 byte[] rawData = readFile(fs, splitFile);
478
479 FSDataOutputStream splitOut = fs.create(splitFile);
480 try {
481 splitOut.write(rawData);
482
483 try {
484
485 while (!daughterRegions.isEmpty()) {
486 LOG.debug(daughterRegions.size() + " RS have regions to splt.");
487
488
489 final TreeMap<ServerName, Integer> rsSizes = Maps.newTreeMap();
490 List<HRegionLocation> hrls = regionLocator.getAllRegionLocations();
491 for (HRegionLocation hrl: hrls) {
492 ServerName sn = hrl.getServerName();
493 if (rsSizes.containsKey(sn)) {
494 rsSizes.put(sn, rsSizes.get(sn) + 1);
495 } else {
496 rsSizes.put(sn, 1);
497 }
498 }
499
500
501 List<String> serversLeft = Lists.newArrayList(daughterRegions .keySet());
502 Collections.sort(serversLeft, new Comparator<String>() {
503 public int compare(String o1, String o2) {
504 return rsSizes.get(o1).compareTo(rsSizes.get(o2));
505 }
506 });
507
508
509
510 for (String rsLoc : serversLeft) {
511 Pair<byte[], byte[]> dr = null;
512
513
514 LOG.debug("Finding a region on " + rsLoc);
515 LinkedList<Pair<byte[], byte[]>> regionList = daughterRegions.get(rsLoc);
516 while (!regionList.isEmpty()) {
517 dr = regionList.pop();
518
519
520 byte[] split = dr.getSecond();
521 HRegionLocation regionLoc = regionLocator.getRegionLocation(split);
522
523
524 String newRs = regionLoc.getHostnamePort();
525 if (newRs.compareTo(rsLoc) != 0) {
526 LOG.debug("Region with " + splitAlgo.rowToStr(split)
527 + " moved to " + newRs + ". Relocating...");
528
529 if (!daughterRegions.containsKey(newRs)) {
530 LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
531 daughterRegions.put(newRs, entry);
532 }
533 daughterRegions.get(newRs).add(dr);
534 dr = null;
535 continue;
536 }
537
538
539 byte[] sk = regionLoc.getRegionInfo().getStartKey();
540 if (sk.length != 0) {
541 if (Bytes.equals(split, sk)) {
542 LOG.debug("Region already split on "
543 + splitAlgo.rowToStr(split) + ". Skipping this region...");
544 ++splitCount;
545 dr = null;
546 continue;
547 }
548 byte[] start = dr.getFirst();
549 Preconditions.checkArgument(Bytes.equals(start, sk), splitAlgo
550 .rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
551 }
552
553
554 break;
555 }
556 if (regionList.isEmpty()) {
557 daughterRegions.remove(rsLoc);
558 }
559 if (dr == null)
560 continue;
561
562
563 byte[] split = dr.getSecond();
564 LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
565 try (Admin admin = connection.getAdmin()) {
566 admin.split(tableName, split);
567 }
568
569 LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
570 LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();
571 if (conf.getBoolean("split.verify", true)) {
572
573 outstanding.addLast(dr);
574
575 while (outstanding.size() >= MAX_OUTSTANDING) {
576 LOG.debug("Wait for outstanding splits " + outstanding.size());
577 local_finished = splitScan(outstanding, connection, tableName, splitAlgo);
578 if (local_finished.isEmpty()) {
579 Thread.sleep(30 * 1000);
580 } else {
581 finished.addAll(local_finished);
582 outstanding.removeAll(local_finished);
583 LOG.debug(local_finished.size() + " outstanding splits finished");
584 }
585 }
586 } else {
587 finished.add(dr);
588 }
589
590
591 for (Pair<byte[], byte[]> region : finished) {
592 splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
593 + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
594 splitCount++;
595 if (splitCount % 10 == 0) {
596 long tDiff = (System.currentTimeMillis() - startTime)
597 / splitCount;
598 LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount
599 + ". Avg Time / Split = "
600 + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
601 }
602 }
603 }
604 }
605 if (conf.getBoolean("split.verify", true)) {
606 while (!outstanding.isEmpty()) {
607 LOG.debug("Finally Wait for outstanding splits " + outstanding.size());
608 LinkedList<Pair<byte[], byte[]>> finished = splitScan(outstanding,
609 connection, tableName, splitAlgo);
610 if (finished.isEmpty()) {
611 Thread.sleep(30 * 1000);
612 } else {
613 outstanding.removeAll(finished);
614 for (Pair<byte[], byte[]> region : finished) {
615 splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
616 + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
617 splitCount++;
618 }
619 LOG.debug("Finally " + finished.size() + " outstanding splits finished");
620 }
621 }
622 }
623 LOG.debug("All regions have been successfully split!");
624 } finally {
625 long tDiff = System.currentTimeMillis() - startTime;
626 LOG.debug("TOTAL TIME = "
627 + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
628 LOG.debug("Splits = " + splitCount);
629 if (0 < splitCount) {
630 LOG.debug("Avg Time / Split = "
631 + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
632 }
633 }
634 } finally {
635 splitOut.close();
636 fs.delete(splitFile, false);
637 }
638 }
639 }
640 }
641
642
643
644
645
646 public static SplitAlgorithm newSplitAlgoInstance(Configuration conf,
647 String splitClassName) throws IOException {
648 Class<?> splitClass;
649
650
651
652 if(splitClassName.equals(HexStringSplit.class.getSimpleName())) {
653 splitClass = HexStringSplit.class;
654 } else if (splitClassName.equals(UniformSplit.class.getSimpleName())) {
655 splitClass = UniformSplit.class;
656 } else {
657 try {
658 splitClass = conf.getClassByName(splitClassName);
659 } catch (ClassNotFoundException e) {
660 throw new IOException("Couldn't load split class " + splitClassName, e);
661 }
662 if(splitClass == null) {
663 throw new IOException("Failed loading split class " + splitClassName);
664 }
665 if(!SplitAlgorithm.class.isAssignableFrom(splitClass)) {
666 throw new IOException(
667 "Specified split class doesn't implement SplitAlgorithm");
668 }
669 }
670 try {
671 return splitClass.asSubclass(SplitAlgorithm.class).newInstance();
672 } catch (Exception e) {
673 throw new IOException("Problem loading split algorithm: ", e);
674 }
675 }
676
677 static LinkedList<Pair<byte[], byte[]>> splitScan(
678 LinkedList<Pair<byte[], byte[]>> regionList,
679 final Connection connection,
680 final TableName tableName,
681 SplitAlgorithm splitAlgo)
682 throws IOException, InterruptedException {
683 LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
684 LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
685 LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
686
687
688 Pair<Path, Path> tableDirAndSplitFile =
689 getTableDirAndSplitFile(connection.getConfiguration(), tableName);
690 Path tableDir = tableDirAndSplitFile.getFirst();
691 FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
692
693 ((ClusterConnection)connection).clearRegionCache();
694 HTableDescriptor htd = null;
695 try (Table table = connection.getTable(tableName)) {
696 htd = table.getTableDescriptor();
697 }
698 try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
699
700
701 for (Pair<byte[], byte[]> region : regionList) {
702 byte[] start = region.getFirst();
703 byte[] split = region.getSecond();
704
705
706 try {
707 HRegionInfo dri = regionLocator.getRegionLocation(split).getRegionInfo();
708 if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
709 logicalSplitting.add(region);
710 continue;
711 }
712 } catch (NoServerForRegionException nsfre) {
713
714 LOG.info(nsfre);
715 logicalSplitting.add(region);
716 continue;
717 }
718
719 try {
720
721
722 LinkedList<HRegionInfo> check = Lists.newLinkedList();
723 check.add(regionLocator.getRegionLocation(start).getRegionInfo());
724 check.add(regionLocator.getRegionLocation(split).getRegionInfo());
725 for (HRegionInfo hri : check.toArray(new HRegionInfo[check.size()])) {
726 byte[] sk = hri.getStartKey();
727 if (sk.length == 0)
728 sk = splitAlgo.firstRow();
729
730 HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
731 connection.getConfiguration(), fs, tableDir, hri, true);
732
733
734 boolean refFound = false;
735 for (HColumnDescriptor c : htd.getFamilies()) {
736 if ((refFound = regionFs.hasReferences(c.getNameAsString()))) {
737 break;
738 }
739 }
740
741
742 if (!refFound) {
743 check.remove(hri);
744 }
745 }
746 if (check.isEmpty()) {
747 finished.add(region);
748 } else {
749 physicalSplitting.add(region);
750 }
751 } catch (NoServerForRegionException nsfre) {
752 LOG.debug("No Server Exception thrown for: " + splitAlgo.rowToStr(start));
753 physicalSplitting.add(region);
754 ((ClusterConnection)connection).clearRegionCache();
755 }
756 }
757
758 LOG.debug("Split Scan: " + finished.size() + " finished / "
759 + logicalSplitting.size() + " split wait / "
760 + physicalSplitting.size() + " reference wait");
761
762 return finished;
763 }
764 }
765
766
767
768
769
770
771
772 private static Pair<Path, Path> getTableDirAndSplitFile(final Configuration conf,
773 final TableName tableName)
774 throws IOException {
775 Path hbDir = FSUtils.getRootDir(conf);
776 Path tableDir = FSUtils.getTableDir(hbDir, tableName);
777 Path splitFile = new Path(tableDir, "_balancedSplit");
778 return new Pair<Path, Path>(tableDir, splitFile);
779 }
780
781 static LinkedList<Pair<byte[], byte[]>> getSplits(final Connection connection,
782 TableName tableName, SplitAlgorithm splitAlgo)
783 throws IOException {
784 Pair<Path, Path> tableDirAndSplitFile =
785 getTableDirAndSplitFile(connection.getConfiguration(), tableName);
786 Path tableDir = tableDirAndSplitFile.getFirst();
787 Path splitFile = tableDirAndSplitFile.getSecond();
788
789 FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
790
791
792 Set<Pair<String, String>> daughterRegions = Sets.newHashSet();
793
794
795 if (!fs.exists(splitFile)) {
796
797 LOG.debug("No " + splitFile.getName() + " file. Calculating splits ");
798
799
800 Set<Pair<byte[], byte[]>> rows = Sets.newHashSet();
801 Pair<byte[][], byte[][]> tmp = null;
802 try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
803 tmp = regionLocator.getStartEndKeys();
804 }
805 Preconditions.checkArgument(tmp.getFirst().length == tmp.getSecond().length,
806 "Start and End rows should be equivalent");
807 for (int i = 0; i < tmp.getFirst().length; ++i) {
808 byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i];
809 if (start.length == 0)
810 start = splitAlgo.firstRow();
811 if (end.length == 0)
812 end = splitAlgo.lastRow();
813 rows.add(Pair.newPair(start, end));
814 }
815 LOG.debug("Table " + tableName + " has " + rows.size() + " regions that will be split.");
816
817
818 Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
819 FSDataOutputStream tmpOut = fs.create(tmpFile);
820
821
822 for (Pair<byte[], byte[]> r : rows) {
823 byte[] splitPoint = splitAlgo.split(r.getFirst(), r.getSecond());
824 String startStr = splitAlgo.rowToStr(r.getFirst());
825 String splitStr = splitAlgo.rowToStr(splitPoint);
826 daughterRegions.add(Pair.newPair(startStr, splitStr));
827 LOG.debug("Will Split [" + startStr + " , "
828 + splitAlgo.rowToStr(r.getSecond()) + ") at " + splitStr);
829 tmpOut.writeChars("+ " + startStr + splitAlgo.separator() + splitStr
830 + "\n");
831 }
832 tmpOut.close();
833 fs.rename(tmpFile, splitFile);
834 } else {
835 LOG.debug("_balancedSplit file found. Replay log to restore state...");
836 FSUtils.getInstance(fs, connection.getConfiguration())
837 .recoverFileLease(fs, splitFile, connection.getConfiguration(), null);
838
839
840 FSDataInputStream tmpIn = fs.open(splitFile);
841 StringBuilder sb = new StringBuilder(tmpIn.available());
842 while (tmpIn.available() > 0) {
843 sb.append(tmpIn.readChar());
844 }
845 tmpIn.close();
846 for (String line : sb.toString().split("\n")) {
847 String[] cmd = line.split(splitAlgo.separator());
848 Preconditions.checkArgument(3 == cmd.length);
849 byte[] start = splitAlgo.strToRow(cmd[1]);
850 String startStr = splitAlgo.rowToStr(start);
851 byte[] splitPoint = splitAlgo.strToRow(cmd[2]);
852 String splitStr = splitAlgo.rowToStr(splitPoint);
853 Pair<String, String> r = Pair.newPair(startStr, splitStr);
854 if (cmd[0].equals("+")) {
855 LOG.debug("Adding: " + r);
856 daughterRegions.add(r);
857 } else {
858 LOG.debug("Removing: " + r);
859 Preconditions.checkArgument(cmd[0].equals("-"),
860 "Unknown option: " + cmd[0]);
861 Preconditions.checkState(daughterRegions.contains(r),
862 "Missing row: " + r);
863 daughterRegions.remove(r);
864 }
865 }
866 LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
867 }
868 LinkedList<Pair<byte[], byte[]>> ret = Lists.newLinkedList();
869 for (Pair<String, String> r : daughterRegions) {
870 ret.add(Pair.newPair(splitAlgo.strToRow(r.getFirst()), splitAlgo
871 .strToRow(r.getSecond())));
872 }
873 return ret;
874 }
875
876
877
878
879
880
881
882
883
884
885
886
887 public static class HexStringSplit implements SplitAlgorithm {
888 final static String DEFAULT_MIN_HEX = "00000000";
889 final static String DEFAULT_MAX_HEX = "FFFFFFFF";
890
891 String firstRow = DEFAULT_MIN_HEX;
892 BigInteger firstRowInt = BigInteger.ZERO;
893 String lastRow = DEFAULT_MAX_HEX;
894 BigInteger lastRowInt = new BigInteger(lastRow, 16);
895 int rowComparisonLength = lastRow.length();
896
897 public byte[] split(byte[] start, byte[] end) {
898 BigInteger s = convertToBigInteger(start);
899 BigInteger e = convertToBigInteger(end);
900 Preconditions.checkArgument(!e.equals(BigInteger.ZERO));
901 return convertToByte(split2(s, e));
902 }
903
904 public byte[][] split(int n) {
905 Preconditions.checkArgument(lastRowInt.compareTo(firstRowInt) > 0,
906 "last row (%s) is configured less than first row (%s)", lastRow,
907 firstRow);
908
909 BigInteger range = lastRowInt.subtract(firstRowInt).add(BigInteger.ONE);
910 Preconditions.checkState(range.compareTo(BigInteger.valueOf(n)) >= 0,
911 "split granularity (%s) is greater than the range (%s)", n, range);
912
913 BigInteger[] splits = new BigInteger[n - 1];
914 BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(n));
915 for (int i = 1; i < n; i++) {
916
917
918 splits[i - 1] = firstRowInt.add(sizeOfEachSplit.multiply(BigInteger
919 .valueOf(i)));
920 }
921 return convertToBytes(splits);
922 }
923
924 public byte[] firstRow() {
925 return convertToByte(firstRowInt);
926 }
927
928 public byte[] lastRow() {
929 return convertToByte(lastRowInt);
930 }
931
932 public void setFirstRow(String userInput) {
933 firstRow = userInput;
934 firstRowInt = new BigInteger(firstRow, 16);
935 }
936
937 public void setLastRow(String userInput) {
938 lastRow = userInput;
939 lastRowInt = new BigInteger(lastRow, 16);
940
941 rowComparisonLength = lastRow.length();
942 }
943
944 public byte[] strToRow(String in) {
945 return convertToByte(new BigInteger(in, 16));
946 }
947
948 public String rowToStr(byte[] row) {
949 return Bytes.toStringBinary(row);
950 }
951
952 public String separator() {
953 return " ";
954 }
955
956 @Override
957 public void setFirstRow(byte[] userInput) {
958 firstRow = Bytes.toString(userInput);
959 }
960
961 @Override
962 public void setLastRow(byte[] userInput) {
963 lastRow = Bytes.toString(userInput);
964 }
965
966
967
968
969
970
971
972
973 public BigInteger split2(BigInteger a, BigInteger b) {
974 return a.add(b).divide(BigInteger.valueOf(2)).abs();
975 }
976
977
978
979
980
981
982
983 public byte[][] convertToBytes(BigInteger[] bigIntegers) {
984 byte[][] returnBytes = new byte[bigIntegers.length][];
985 for (int i = 0; i < bigIntegers.length; i++) {
986 returnBytes[i] = convertToByte(bigIntegers[i]);
987 }
988 return returnBytes;
989 }
990
991
992
993
994
995
996
997
998 public static byte[] convertToByte(BigInteger bigInteger, int pad) {
999 String bigIntegerString = bigInteger.toString(16);
1000 bigIntegerString = StringUtils.leftPad(bigIntegerString, pad, '0');
1001 return Bytes.toBytes(bigIntegerString);
1002 }
1003
1004
1005
1006
1007
1008
1009
1010 public byte[] convertToByte(BigInteger bigInteger) {
1011 return convertToByte(bigInteger, rowComparisonLength);
1012 }
1013
1014
1015
1016
1017
1018
1019
1020 public BigInteger convertToBigInteger(byte[] row) {
1021 return (row.length > 0) ? new BigInteger(Bytes.toString(row), 16)
1022 : BigInteger.ZERO;
1023 }
1024
1025 @Override
1026 public String toString() {
1027 return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1028 + "," + rowToStr(lastRow()) + "]";
1029 }
1030 }
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040 public static class UniformSplit implements SplitAlgorithm {
1041 static final byte xFF = (byte) 0xFF;
1042 byte[] firstRowBytes = ArrayUtils.EMPTY_BYTE_ARRAY;
1043 byte[] lastRowBytes =
1044 new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF};
1045 public byte[] split(byte[] start, byte[] end) {
1046 return Bytes.split(start, end, 1)[1];
1047 }
1048
1049 @Override
1050 public byte[][] split(int numRegions) {
1051 Preconditions.checkArgument(
1052 Bytes.compareTo(lastRowBytes, firstRowBytes) > 0,
1053 "last row (%s) is configured less than first row (%s)",
1054 Bytes.toStringBinary(lastRowBytes),
1055 Bytes.toStringBinary(firstRowBytes));
1056
1057 byte[][] splits = Bytes.split(firstRowBytes, lastRowBytes, true,
1058 numRegions - 1);
1059 Preconditions.checkState(splits != null,
1060 "Could not split region with given user input: " + this);
1061
1062
1063 return Arrays.copyOfRange(splits, 1, splits.length - 1);
1064 }
1065
1066 @Override
1067 public byte[] firstRow() {
1068 return firstRowBytes;
1069 }
1070
1071 @Override
1072 public byte[] lastRow() {
1073 return lastRowBytes;
1074 }
1075
1076 @Override
1077 public void setFirstRow(String userInput) {
1078 firstRowBytes = Bytes.toBytesBinary(userInput);
1079 }
1080
1081 @Override
1082 public void setLastRow(String userInput) {
1083 lastRowBytes = Bytes.toBytesBinary(userInput);
1084 }
1085
1086
1087 @Override
1088 public void setFirstRow(byte[] userInput) {
1089 firstRowBytes = userInput;
1090 }
1091
1092 @Override
1093 public void setLastRow(byte[] userInput) {
1094 lastRowBytes = userInput;
1095 }
1096
1097 @Override
1098 public byte[] strToRow(String input) {
1099 return Bytes.toBytesBinary(input);
1100 }
1101
1102 @Override
1103 public String rowToStr(byte[] row) {
1104 return Bytes.toStringBinary(row);
1105 }
1106
1107 @Override
1108 public String separator() {
1109 return ",";
1110 }
1111
1112 @Override
1113 public String toString() {
1114 return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1115 + "," + rowToStr(lastRow()) + "]";
1116 }
1117 }
1118 }