1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Deque;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Set;
34 import java.util.TreeMap;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.classification.InterfaceAudience;
47 import org.apache.hadoop.classification.InterfaceStability;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.conf.Configured;
50 import org.apache.hadoop.fs.FileStatus;
51 import org.apache.hadoop.fs.FileSystem;
52 import org.apache.hadoop.fs.FileUtil;
53 import org.apache.hadoop.fs.Path;
54 import org.apache.hadoop.hbase.HBaseConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HTableDescriptor;
58 import org.apache.hadoop.hbase.KeyValue;
59 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
60 import org.apache.hadoop.hbase.client.HBaseAdmin;
61 import org.apache.hadoop.hbase.client.HConnection;
62 import org.apache.hadoop.hbase.client.HTable;
63 import org.apache.hadoop.hbase.client.ServerCallable;
64 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
65 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
66 import org.apache.hadoop.hbase.io.Reference;
67 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
68 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
69 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
70 import org.apache.hadoop.hbase.io.hfile.HFile;
71 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
72 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
73 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
74 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
75 import org.apache.hadoop.hbase.regionserver.BloomType;
76 import org.apache.hadoop.hbase.regionserver.HStore;
77 import org.apache.hadoop.hbase.regionserver.StoreFile;
78 import org.apache.hadoop.hbase.security.User;
79 import org.apache.hadoop.hbase.util.Bytes;
80 import org.apache.hadoop.hbase.util.Pair;
81 import org.apache.hadoop.security.token.Token;
82 import org.apache.hadoop.util.Tool;
83 import org.apache.hadoop.util.ToolRunner;
84
85 import com.google.common.collect.HashMultimap;
86 import com.google.common.collect.Multimap;
87 import com.google.common.collect.Multimaps;
88 import com.google.common.util.concurrent.ThreadFactoryBuilder;
89
90
91
92
93
94 @InterfaceAudience.Public
95 @InterfaceStability.Stable
96 public class LoadIncrementalHFiles extends Configured implements Tool {
97 private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
98 static final AtomicLong regionCount = new AtomicLong(0);
99 private HBaseAdmin hbAdmin;
100 private Configuration cfg;
101
102 public static final String NAME = "completebulkload";
103 private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
104 private boolean assignSeqIds;
105
106 private boolean useSecure;
107 private Token<?> userToken;
108 private String bulkToken;
109
110
111 LoadIncrementalHFiles(Configuration conf, Boolean useSecure) throws Exception {
112 super(conf);
113 this.cfg = conf;
114 this.hbAdmin = new HBaseAdmin(conf);
115
116 this.useSecure = useSecure != null ? useSecure : User.isHBaseSecurityEnabled(conf);
117 }
118
119 public LoadIncrementalHFiles(Configuration conf) throws Exception {
120 this(conf, null);
121 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
122 }
123
124 private void usage() {
125 System.err.println("usage: " + NAME +
126 " /path/to/hfileoutputformat-output " +
127 "tablename");
128 }
129
130
131
132
133
134
135
136
137
138 static class LoadQueueItem {
139 final byte[] family;
140 final Path hfilePath;
141
142 public LoadQueueItem(byte[] family, Path hfilePath) {
143 this.family = family;
144 this.hfilePath = hfilePath;
145 }
146
147 public String toString() {
148 return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
149 }
150 }
151
152
153
154
155
156 private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
157 throws IOException {
158 FileSystem fs = hfofDir.getFileSystem(getConf());
159
160 if (!fs.exists(hfofDir)) {
161 throw new FileNotFoundException("HFileOutputFormat dir " +
162 hfofDir + " not found");
163 }
164
165 FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
166 if (familyDirStatuses == null) {
167 throw new FileNotFoundException("No families found in " + hfofDir);
168 }
169
170 for (FileStatus stat : familyDirStatuses) {
171 if (!stat.isDir()) {
172 LOG.warn("Skipping non-directory " + stat.getPath());
173 continue;
174 }
175 Path familyDir = stat.getPath();
176
177 if (familyDir.getName().startsWith("_")) continue;
178 byte[] family = familyDir.getName().getBytes();
179 Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
180 for (Path hfile : hfiles) {
181 if (hfile.getName().startsWith("_")) continue;
182 ret.add(new LoadQueueItem(family, hfile));
183 }
184 }
185 }
186
187
188
189
190
191
192
193
194
195
196 public void doBulkLoad(Path hfofDir, final HTable table)
197 throws TableNotFoundException, IOException
198 {
199 final HConnection conn = table.getConnection();
200
201 if (!conn.isTableAvailable(table.getTableName())) {
202 throw new TableNotFoundException("Table " +
203 Bytes.toStringBinary(table.getTableName()) +
204 "is not currently available.");
205 }
206
207
208 int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
209 Runtime.getRuntime().availableProcessors());
210 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
211 builder.setNameFormat("LoadIncrementalHFiles-%1$d");
212 ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
213 60, TimeUnit.SECONDS,
214 new LinkedBlockingQueue<Runnable>(),
215 builder.build());
216 ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
217
218
219
220 Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
221 try {
222 discoverLoadQueue(queue, hfofDir);
223
224 Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
225 ArrayList<String> familyNames = new ArrayList<String>();
226 for (HColumnDescriptor family : families) {
227 familyNames.add(family.getNameAsString());
228 }
229 ArrayList<String> unmatchedFamilies = new ArrayList<String>();
230 for (LoadQueueItem lqi : queue) {
231 String familyNameInHFile = Bytes.toString(lqi.family);
232 if (!familyNames.contains(familyNameInHFile)) {
233 unmatchedFamilies.add(familyNameInHFile);
234 }
235 }
236 if (unmatchedFamilies.size() > 0) {
237 String msg =
238 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
239 + unmatchedFamilies + "; valid family names of table "
240 + Bytes.toString(table.getTableName()) + " are: " + familyNames;
241 LOG.error(msg);
242 throw new IOException(msg);
243 }
244 int count = 0;
245
246 if (queue.isEmpty()) {
247 LOG.warn("Bulk load operation did not find any files to load in " +
248 "directory " + hfofDir.toUri() + ". Does it contain files in " +
249 "subdirectories that correspond to column family names?");
250 return;
251 }
252
253
254
255 if(useSecure) {
256 FileSystem fs = FileSystem.get(cfg);
257
258
259 if(User.isSecurityEnabled()) {
260 userToken = fs.getDelegationToken("renewer");
261 }
262 bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getTableName());
263 }
264
265
266 while (!queue.isEmpty()) {
267
268 final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
269 if (count != 0) {
270 LOG.info("Split occured while grouping HFiles, retry attempt " +
271 + count + " with " + queue.size() + " files remaining to group or split");
272 }
273
274 int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 0);
275 if (maxRetries != 0 && count >= maxRetries) {
276 LOG.error("Retry attempted " + count + " times without completing, bailing out");
277 return;
278 }
279 count++;
280
281
282 Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
283 pool, queue, startEndKeys);
284
285 bulkLoadPhase(table, conn, pool, queue, regionGroups);
286
287
288
289
290 }
291
292 } finally {
293 if(useSecure) {
294 if(userToken != null) {
295 try {
296 userToken.cancel(cfg);
297 } catch (Exception e) {
298 LOG.warn("Failed to cancel HDFS delegation token.", e);
299 }
300 }
301 if(bulkToken != null) {
302 new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
303 }
304 }
305 pool.shutdown();
306 if (queue != null && !queue.isEmpty()) {
307 StringBuilder err = new StringBuilder();
308 err.append("-------------------------------------------------\n");
309 err.append("Bulk load aborted with some files not yet loaded:\n");
310 err.append("-------------------------------------------------\n");
311 for (LoadQueueItem q : queue) {
312 err.append(" ").append(q.hfilePath).append('\n');
313 }
314 LOG.error(err);
315 }
316 }
317
318 if (queue != null && !queue.isEmpty()) {
319 throw new RuntimeException("Bulk load aborted with some files not yet loaded."
320 + "Please check log for more details.");
321 }
322 }
323
324
325
326
327
328
329 protected void bulkLoadPhase(final HTable table, final HConnection conn,
330 ExecutorService pool, Deque<LoadQueueItem> queue,
331 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
332
333 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
334 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
335 final byte[] first = e.getKey().array();
336 final Collection<LoadQueueItem> lqis = e.getValue();
337
338 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
339 public List<LoadQueueItem> call() throws Exception {
340 List<LoadQueueItem> toRetry = tryAtomicRegionLoad(conn, table.getTableName(), first, lqis);
341 return toRetry;
342 }
343 };
344 loadingFutures.add(pool.submit(call));
345 }
346
347
348 for (Future<List<LoadQueueItem>> future : loadingFutures) {
349 try {
350 List<LoadQueueItem> toRetry = future.get();
351
352
353 queue.addAll(toRetry);
354
355 } catch (ExecutionException e1) {
356 Throwable t = e1.getCause();
357 if (t instanceof IOException) {
358
359
360 throw new IOException("BulkLoad encountered an unrecoverable problem", t);
361 }
362 LOG.error("Unexpected execution exception during bulk load", e1);
363 throw new IllegalStateException(t);
364 } catch (InterruptedException e1) {
365 LOG.error("Unexpected interrupted exception during bulk load", e1);
366 throw new IllegalStateException(e1);
367 }
368 }
369 }
370
371
372
373
374
375 private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table,
376 ExecutorService pool, Deque<LoadQueueItem> queue,
377 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
378
379
380 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
381 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
382
383
384 Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
385 while (!queue.isEmpty()) {
386 final LoadQueueItem item = queue.remove();
387
388 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
389 public List<LoadQueueItem> call() throws Exception {
390 List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
391 return splits;
392 }
393 };
394 splittingFutures.add(pool.submit(call));
395 }
396
397
398 for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
399 try {
400 List<LoadQueueItem> splits = lqis.get();
401 if (splits != null) {
402 queue.addAll(splits);
403 }
404 } catch (ExecutionException e1) {
405 Throwable t = e1.getCause();
406 if (t instanceof IOException) {
407 LOG.error("IOException during splitting", e1);
408 throw (IOException)t;
409 }
410 LOG.error("Unexpected execution exception during splitting", e1);
411 throw new IllegalStateException(t);
412 } catch (InterruptedException e1) {
413 LOG.error("Unexpected interrupted exception during splitting", e1);
414 throw new IllegalStateException(e1);
415 }
416 }
417 return regionGroups;
418 }
419
420
421 String getUniqueName(byte[] tableName) {
422 String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet();
423 return name;
424 }
425
426 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
427 final HTable table, byte[] startKey,
428 byte[] splitKey) throws IOException {
429 final Path hfilePath = item.hfilePath;
430
431
432
433 final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
434
435 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
436 "region. Splitting...");
437
438 String uniqueName = getUniqueName(table.getTableName());
439 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
440 Path botOut = new Path(tmpDir, uniqueName + ".bottom");
441 Path topOut = new Path(tmpDir, uniqueName + ".top");
442 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
443 botOut, topOut);
444
445
446
447 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
448 lqis.add(new LoadQueueItem(item.family, botOut));
449 lqis.add(new LoadQueueItem(item.family, topOut));
450
451 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
452 return lqis;
453 }
454
455
456
457
458
459
460
461
462
463 protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
464 final LoadQueueItem item, final HTable table,
465 final Pair<byte[][], byte[][]> startEndKeys)
466 throws IOException {
467 final Path hfilePath = item.hfilePath;
468 final FileSystem fs = hfilePath.getFileSystem(getConf());
469 HFile.Reader hfr = HFile.createReader(fs, hfilePath,
470 new CacheConfig(getConf()));
471 final byte[] first, last;
472 try {
473 hfr.loadFileInfo();
474 first = hfr.getFirstRowKey();
475 last = hfr.getLastRowKey();
476 } finally {
477 hfr.close();
478 }
479
480 LOG.info("Trying to load hfile=" + hfilePath +
481 " first=" + Bytes.toStringBinary(first) +
482 " last=" + Bytes.toStringBinary(last));
483 if (first == null || last == null) {
484 assert first == null && last == null;
485
486 LOG.info("hfile " + hfilePath + " has no entries, skipping");
487 return null;
488 }
489 if (Bytes.compareTo(first, last) > 0) {
490 throw new IllegalArgumentException(
491 "Invalid range: " + Bytes.toStringBinary(first) +
492 " > " + Bytes.toStringBinary(last));
493 }
494 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
495 Bytes.BYTES_COMPARATOR);
496 if (idx < 0) {
497
498
499 idx = -(idx + 1) - 1;
500 }
501 final int indexForCallable = idx;
502 boolean lastKeyInRange =
503 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
504 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
505 if (!lastKeyInRange) {
506 List<LoadQueueItem> lqis = splitStoreFile(item, table,
507 startEndKeys.getFirst()[indexForCallable],
508 startEndKeys.getSecond()[indexForCallable]);
509 return lqis;
510 }
511
512
513 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
514 return null;
515 }
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
531 byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) throws IOException {
532
533 final List<Pair<byte[], String>> famPaths =
534 new ArrayList<Pair<byte[], String>>(lqis.size());
535 for (LoadQueueItem lqi : lqis) {
536 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
537 }
538
539 final ServerCallable<Boolean> svrCallable = new ServerCallable<Boolean>(conn,
540 tableName, first) {
541 @Override
542 public Boolean call() throws Exception {
543 SecureBulkLoadClient secureClient = null;
544 boolean success = false;
545
546 try {
547 LOG.debug("Going to connect to server " + location + " for row "
548 + Bytes.toStringBinary(row));
549 byte[] regionName = location.getRegionInfo().getRegionName();
550 if(!useSecure) {
551 success = ProtobufUtil.bulkLoadHFile(stub, famPaths, regionName, assignSeqIds);
552 } else {
553 HTable table = new HTable(conn.getConfiguration(), tableName);
554 secureClient = new SecureBulkLoadClient(table);
555 success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, location.getRegionInfo().getStartKey());
556 }
557 return success;
558 } finally {
559
560
561
562 if(secureClient != null && !success) {
563 FileSystem fs = FileSystem.get(cfg);
564 for(Pair<byte[], String> el : famPaths) {
565 Path hfileStagingPath = null;
566 Path hfileOrigPath = new Path(el.getSecond());
567 try {
568 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
569 hfileOrigPath.getName());
570 if(fs.rename(hfileStagingPath, hfileOrigPath)) {
571 LOG.debug("Moved back file " + hfileOrigPath + " from " +
572 hfileStagingPath);
573 } else if(fs.exists(hfileStagingPath)){
574 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
575 hfileStagingPath);
576 }
577 } catch(Exception ex) {
578 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
579 hfileStagingPath, ex);
580 }
581 }
582 }
583 }
584 }
585 };
586
587 try {
588 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
589 boolean success = svrCallable.withRetries();
590 if (!success) {
591 LOG.warn("Attempt to bulk load region containing "
592 + Bytes.toStringBinary(first) + " into table "
593 + Bytes.toStringBinary(tableName) + " with files " + lqis
594 + " failed. This is recoverable and they will be retried.");
595 toRetry.addAll(lqis);
596 }
597
598 return toRetry;
599 } catch (IOException e) {
600 LOG.error("Encountered unrecoverable error from region server", e);
601 throw e;
602 }
603 }
604
605
606
607
608
609 static void splitStoreFile(
610 Configuration conf, Path inFile,
611 HColumnDescriptor familyDesc, byte[] splitKey,
612 Path bottomOut, Path topOut) throws IOException
613 {
614
615 Reference topReference = Reference.createTopReference(splitKey);
616 Reference bottomReference = Reference.createBottomReference(splitKey);
617
618 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
619 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
620 }
621
622
623
624
625 private static void copyHFileHalf(
626 Configuration conf, Path inFile, Path outFile, Reference reference,
627 HColumnDescriptor familyDescriptor)
628 throws IOException {
629 FileSystem fs = inFile.getFileSystem(conf);
630 CacheConfig cacheConf = new CacheConfig(conf);
631 HalfStoreFileReader halfReader = null;
632 StoreFile.Writer halfWriter = null;
633 HFileDataBlockEncoder dataBlockEncoder = new HFileDataBlockEncoderImpl(
634 familyDescriptor.getDataBlockEncodingOnDisk(),
635 familyDescriptor.getDataBlockEncoding());
636 try {
637 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf,
638 reference, DataBlockEncoding.NONE);
639 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
640
641 int blocksize = familyDescriptor.getBlocksize();
642 Algorithm compression = familyDescriptor.getCompression();
643 BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
644
645 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
646 fs, blocksize)
647 .withFilePath(outFile)
648 .withCompression(compression)
649 .withDataBlockEncoder(dataBlockEncoder)
650 .withBloomType(bloomFilterType)
651 .withChecksumType(HStore.getChecksumType(conf))
652 .withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
653 .build();
654 HFileScanner scanner = halfReader.getScanner(false, false, false);
655 scanner.seekTo();
656 do {
657 KeyValue kv = scanner.getKeyValue();
658 halfWriter.append(kv);
659 } while (scanner.next());
660
661 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
662 if (shouldCopyHFileMetaKey(entry.getKey())) {
663 halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
664 }
665 }
666 } finally {
667 if (halfWriter != null) halfWriter.close();
668 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
669 }
670 }
671
672 private static boolean shouldCopyHFileMetaKey(byte[] key) {
673 return !HFile.isReservedFileInfoKey(key);
674 }
675
676 private boolean doesTableExist(String tableName) throws Exception {
677 return hbAdmin.tableExists(tableName);
678 }
679
680
681
682
683
684
685
686
687
688
689
690
691
692 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
693 ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
694 int runningValue = 0;
695 byte[] currStartKey = null;
696 boolean firstBoundary = true;
697
698 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
699 if (runningValue == 0) currStartKey = item.getKey();
700 runningValue += item.getValue();
701 if (runningValue == 0) {
702 if (!firstBoundary) keysArray.add(currStartKey);
703 firstBoundary = false;
704 }
705 }
706
707 return keysArray.toArray(new byte[0][0]);
708 }
709
710
711
712
713
714 private void createTable(String tableName, String dirPath) throws Exception {
715 Path hfofDir = new Path(dirPath);
716 FileSystem fs = hfofDir.getFileSystem(getConf());
717
718 if (!fs.exists(hfofDir)) {
719 throw new FileNotFoundException("HFileOutputFormat dir " +
720 hfofDir + " not found");
721 }
722
723 FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
724 if (familyDirStatuses == null) {
725 throw new FileNotFoundException("No families found in " + hfofDir);
726 }
727
728 HTableDescriptor htd = new HTableDescriptor(tableName);
729 HColumnDescriptor hcd;
730
731
732
733 byte[][] keys;
734 TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
735
736 for (FileStatus stat : familyDirStatuses) {
737 if (!stat.isDir()) {
738 LOG.warn("Skipping non-directory " + stat.getPath());
739 continue;
740 }
741 Path familyDir = stat.getPath();
742
743 if (familyDir.getName().startsWith("_")) continue;
744 byte[] family = familyDir.getName().getBytes();
745
746 hcd = new HColumnDescriptor(family);
747 htd.addFamily(hcd);
748
749 Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
750 for (Path hfile : hfiles) {
751 if (hfile.getName().startsWith("_")) continue;
752 HFile.Reader reader = HFile.createReader(fs, hfile,
753 new CacheConfig(getConf()));
754 final byte[] first, last;
755 try {
756 if (hcd.getCompressionType() != reader.getCompressionAlgorithm()) {
757 hcd.setCompressionType(reader.getCompressionAlgorithm());
758 LOG.info("Setting compression " + hcd.getCompressionType().name() +
759 " for family " + hcd.toString());
760 }
761 reader.loadFileInfo();
762 first = reader.getFirstRowKey();
763 last = reader.getLastRowKey();
764
765 LOG.info("Trying to figure out region boundaries hfile=" + hfile +
766 " first=" + Bytes.toStringBinary(first) +
767 " last=" + Bytes.toStringBinary(last));
768
769
770 Integer value = map.containsKey(first)? map.get(first):0;
771 map.put(first, value+1);
772
773 value = map.containsKey(last)? map.get(last):0;
774 map.put(last, value-1);
775 } finally {
776 reader.close();
777 }
778 }
779 }
780
781 keys = LoadIncrementalHFiles.inferBoundaries(map);
782 this.hbAdmin.createTable(htd,keys);
783
784 LOG.info("Table "+ tableName +" is available!!");
785 }
786
787 @Override
788 public int run(String[] args) throws Exception {
789 if (args.length != 2) {
790 usage();
791 return -1;
792 }
793
794 String dirPath = args[0];
795 String tableName = args[1];
796
797 boolean tableExists = this.doesTableExist(tableName);
798 if (!tableExists) this.createTable(tableName,dirPath);
799
800 Path hfofDir = new Path(dirPath);
801 HTable table = new HTable(this.cfg, tableName);
802
803 doBulkLoad(hfofDir, table);
804 return 0;
805 }
806
807 public static void main(String[] args) throws Exception {
808 int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
809 System.exit(ret);
810 }
811
812 }