1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static java.lang.String.format;
22
23 import com.google.common.collect.HashMultimap;
24 import com.google.common.collect.Multimap;
25 import com.google.common.collect.Multimaps;
26 import com.google.common.util.concurrent.ThreadFactoryBuilder;
27
28 import org.apache.commons.lang.mutable.MutableInt;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.conf.Configured;
33 import org.apache.hadoop.fs.FileStatus;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.fs.permission.FsPermission;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HColumnDescriptor;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.KeyValueUtil;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.TableNotFoundException;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.hbase.classification.InterfaceStability;
47 import org.apache.hadoop.hbase.client.Admin;
48 import org.apache.hadoop.hbase.client.ClusterConnection;
49 import org.apache.hadoop.hbase.client.Connection;
50 import org.apache.hadoop.hbase.client.ConnectionFactory;
51 import org.apache.hadoop.hbase.client.HBaseAdmin;
52 import org.apache.hadoop.hbase.client.HConnection;
53 import org.apache.hadoop.hbase.client.HTable;
54 import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
55 import org.apache.hadoop.hbase.client.RegionLocator;
56 import org.apache.hadoop.hbase.client.RegionServerCallable;
57 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
58 import org.apache.hadoop.hbase.client.Table;
59 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
60 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
61 import org.apache.hadoop.hbase.io.HFileLink;
62 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
63 import org.apache.hadoop.hbase.io.Reference;
64 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
65 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66 import org.apache.hadoop.hbase.io.hfile.HFile;
67 import org.apache.hadoop.hbase.io.hfile.HFileContext;
68 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
69 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
70 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
71 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
72 import org.apache.hadoop.hbase.regionserver.BloomType;
73 import org.apache.hadoop.hbase.regionserver.HStore;
74 import org.apache.hadoop.hbase.regionserver.StoreFile;
75 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
76 import org.apache.hadoop.hbase.security.UserProvider;
77 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
78 import org.apache.hadoop.hbase.util.Bytes;
79 import org.apache.hadoop.hbase.util.FSHDFSUtils;
80 import org.apache.hadoop.hbase.util.Pair;
81 import org.apache.hadoop.util.Tool;
82 import org.apache.hadoop.util.ToolRunner;
83
84 import java.io.FileNotFoundException;
85 import java.io.IOException;
86 import java.io.InterruptedIOException;
87 import java.nio.ByteBuffer;
88 import java.util.ArrayList;
89 import java.util.Arrays;
90 import java.util.Collection;
91 import java.util.Deque;
92 import java.util.HashMap;
93 import java.util.HashSet;
94 import java.util.Iterator;
95 import java.util.LinkedList;
96 import java.util.List;
97 import java.util.Map;
98 import java.util.Map.Entry;
99 import java.util.Set;
100 import java.util.TreeMap;
101 import java.util.UUID;
102 import java.util.concurrent.Callable;
103 import java.util.concurrent.ExecutionException;
104 import java.util.concurrent.ExecutorService;
105 import java.util.concurrent.Future;
106 import java.util.concurrent.LinkedBlockingQueue;
107 import java.util.concurrent.ThreadPoolExecutor;
108 import java.util.concurrent.TimeUnit;
109
110
111
112
113
114 @InterfaceAudience.Public
115 @InterfaceStability.Stable
116 public class LoadIncrementalHFiles extends Configured implements Tool {
117 private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
118 private Admin hbAdmin;
119
120 public static final String NAME = "completebulkload";
121 public static final String MAX_FILES_PER_REGION_PER_FAMILY
122 = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
123 private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
124 public final static String CREATE_TABLE_CONF_KEY = "create.table";
125
126 private int maxFilesPerRegionPerFamily;
127 private boolean assignSeqIds;
128
129
130 private FileSystem fs;
131
132 private FsDelegationToken fsDelegationToken;
133 private String bulkToken;
134 private UserProvider userProvider;
135
136 private LoadIncrementalHFiles() {}
137
138 public LoadIncrementalHFiles(Configuration conf) throws Exception {
139 super(conf);
140 initialize();
141 }
142
143 private void initialize() throws Exception {
144 if (hbAdmin == null) {
145
146 setConf(HBaseConfiguration.create(getConf()));
147 Configuration conf = getConf();
148
149 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
150 this.hbAdmin = new HBaseAdmin(conf);
151 this.userProvider = UserProvider.instantiate(conf);
152 this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
153 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
154 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
155 }
156 }
157
158 private void usage() {
159 System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
160 + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
161 + " Note: if you set this to 'no', then the target table must already exist in HBase\n"
162 + "\n");
163 }
164
165 private static interface BulkHFileVisitor<TFamily> {
166 TFamily bulkFamily(final byte[] familyName)
167 throws IOException;
168 void bulkHFile(final TFamily family, final FileStatus hfileStatus)
169 throws IOException;
170 }
171
172
173
174
175
176 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
177 final BulkHFileVisitor<TFamily> visitor) throws IOException {
178 if (!fs.exists(bulkDir)) {
179 throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
180 }
181
182 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
183 if (familyDirStatuses == null) {
184 throw new FileNotFoundException("No families found in " + bulkDir);
185 }
186
187 for (FileStatus familyStat : familyDirStatuses) {
188 if (!familyStat.isDirectory()) {
189 LOG.warn("Skipping non-directory " + familyStat.getPath());
190 continue;
191 }
192 Path familyDir = familyStat.getPath();
193 byte[] familyName = familyDir.getName().getBytes();
194 TFamily family = visitor.bulkFamily(familyName);
195
196 FileStatus[] hfileStatuses = fs.listStatus(familyDir);
197 for (FileStatus hfileStatus : hfileStatuses) {
198 if (!fs.isFile(hfileStatus.getPath())) {
199 LOG.warn("Skipping non-file " + hfileStatus);
200 continue;
201 }
202
203 Path hfile = hfileStatus.getPath();
204
205 String fileName = hfile.getName();
206 if (fileName.startsWith("_")) {
207 continue;
208 }
209 if (StoreFileInfo.isReference(fileName)) {
210 LOG.warn("Skipping reference " + fileName);
211 continue;
212 }
213 if (HFileLink.isHFileLink(fileName)) {
214 LOG.warn("Skipping HFileLink " + fileName);
215 continue;
216 }
217
218
219 try {
220 if (!HFile.isHFileFormat(fs, hfile)) {
221 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
222 continue;
223 }
224 } catch (FileNotFoundException e) {
225 LOG.warn("the file " + hfile + " was removed");
226 continue;
227 }
228
229 visitor.bulkHFile(family, hfileStatus);
230 }
231 }
232 }
233
234
235
236
237
238
239
240
241
242 static class LoadQueueItem {
243 final byte[] family;
244 final Path hfilePath;
245
246 public LoadQueueItem(byte[] family, Path hfilePath) {
247 this.family = family;
248 this.hfilePath = hfilePath;
249 }
250
251 @Override
252 public String toString() {
253 return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
254 }
255 }
256
257
258
259
260
261 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir)
262 throws IOException {
263 fs = hfofDir.getFileSystem(getConf());
264 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
265 @Override
266 public byte[] bulkFamily(final byte[] familyName) {
267 return familyName;
268 }
269 @Override
270 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
271 long length = hfile.getLen();
272 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
273 HConstants.DEFAULT_MAX_FILE_SIZE)) {
274 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
275 length + " bytes can be problematic as it may lead to oversplitting.");
276 }
277 ret.add(new LoadQueueItem(family, hfile.getPath()));
278 }
279 });
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293
294 @SuppressWarnings("deprecation")
295 @Deprecated
296 public void doBulkLoad(Path hfofDir, final HTable table)
297 throws TableNotFoundException, IOException
298 {
299 Admin admin = null;
300 Table t = table;
301 Connection conn = table.getConnection();
302 boolean closeConnWhenFinished = false;
303 try {
304 if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
305 LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
306
307 conn = ConnectionFactory.createConnection(table.getConfiguration());
308 t = conn.getTable(table.getName());
309 closeConnWhenFinished = true;
310 if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
311 throw new RuntimeException("Failed to create unmanaged connection.");
312 }
313 admin = conn.getAdmin();
314 } else {
315 admin = conn.getAdmin();
316 }
317 try (RegionLocator rl = conn.getRegionLocator(t.getName())) {
318 doBulkLoad(hfofDir, admin, t, rl);
319 }
320 } finally {
321 if (admin != null) admin.close();
322 if (closeConnWhenFinished) {
323 t.close();
324 conn.close();
325 }
326 }
327 }
328
329
330
331
332
333
334
335
336
337
338 @SuppressWarnings("deprecation")
339 public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
340 RegionLocator regionLocator) throws TableNotFoundException, IOException {
341
342 if (!admin.isTableAvailable(regionLocator.getName())) {
343 throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
344 }
345
346
347 int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
348 Runtime.getRuntime().availableProcessors());
349 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
350 builder.setNameFormat("LoadIncrementalHFiles-%1$d");
351 ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
352 60, TimeUnit.SECONDS,
353 new LinkedBlockingQueue<Runnable>(),
354 builder.build());
355 ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
356
357
358
359 Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
360 try {
361 discoverLoadQueue(queue, hfofDir);
362
363 Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
364 ArrayList<String> familyNames = new ArrayList<String>(families.size());
365 for (HColumnDescriptor family : families) {
366 familyNames.add(family.getNameAsString());
367 }
368 ArrayList<String> unmatchedFamilies = new ArrayList<String>();
369 Iterator<LoadQueueItem> queueIter = queue.iterator();
370 while (queueIter.hasNext()) {
371 LoadQueueItem lqi = queueIter.next();
372 String familyNameInHFile = Bytes.toString(lqi.family);
373 if (!familyNames.contains(familyNameInHFile)) {
374 unmatchedFamilies.add(familyNameInHFile);
375 }
376 }
377 if (unmatchedFamilies.size() > 0) {
378 String msg =
379 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
380 + unmatchedFamilies + "; valid family names of table "
381 + table.getName() + " are: " + familyNames;
382 LOG.error(msg);
383 throw new IOException(msg);
384 }
385 int count = 0;
386
387 if (queue.isEmpty()) {
388 LOG.warn("Bulk load operation did not find any files to load in " +
389 "directory " + hfofDir.toUri() + ". Does it contain files in " +
390 "subdirectories that correspond to column family names?");
391 return;
392 }
393
394
395
396
397 fsDelegationToken.acquireDelegationToken(fs);
398 if(isSecureBulkLoadEndpointAvailable()) {
399 bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
400 }
401
402
403 while (!queue.isEmpty()) {
404
405 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
406 if (count != 0) {
407 LOG.info("Split occured while grouping HFiles, retry attempt " +
408 + count + " with " + queue.size() + " files remaining to group or split");
409 }
410
411 int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10);
412 if (maxRetries != 0 && count >= maxRetries) {
413 throw new IOException("Retry attempted " + count +
414 " times without completing, bailing out");
415 }
416 count++;
417
418
419 Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
420 pool, queue, startEndKeys);
421
422 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
423
424 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
425 + " hfiles to one family of one region");
426 }
427
428 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
429
430
431
432
433 }
434
435 } finally {
436 fsDelegationToken.releaseDelegationToken();
437 if(bulkToken != null) {
438 new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
439 }
440 pool.shutdown();
441 if (queue != null && !queue.isEmpty()) {
442 StringBuilder err = new StringBuilder();
443 err.append("-------------------------------------------------\n");
444 err.append("Bulk load aborted with some files not yet loaded:\n");
445 err.append("-------------------------------------------------\n");
446 for (LoadQueueItem q : queue) {
447 err.append(" ").append(q.hfilePath).append('\n');
448 }
449 LOG.error(err);
450 }
451 }
452
453 if (queue != null && !queue.isEmpty()) {
454 throw new RuntimeException("Bulk load aborted with some files not yet loaded."
455 + "Please check log for more details.");
456 }
457 }
458
459
460
461
462
463
464 protected void bulkLoadPhase(final Table table, final Connection conn,
465 ExecutorService pool, Deque<LoadQueueItem> queue,
466 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
467
468 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
469 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
470 final byte[] first = e.getKey().array();
471 final Collection<LoadQueueItem> lqis = e.getValue();
472
473 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
474 @Override
475 public List<LoadQueueItem> call() throws Exception {
476 List<LoadQueueItem> toRetry =
477 tryAtomicRegionLoad(conn, table.getName(), first, lqis);
478 return toRetry;
479 }
480 };
481 loadingFutures.add(pool.submit(call));
482 }
483
484
485 for (Future<List<LoadQueueItem>> future : loadingFutures) {
486 try {
487 List<LoadQueueItem> toRetry = future.get();
488
489
490 queue.addAll(toRetry);
491
492 } catch (ExecutionException e1) {
493 Throwable t = e1.getCause();
494 if (t instanceof IOException) {
495
496
497 throw new IOException("BulkLoad encountered an unrecoverable problem", t);
498 }
499 LOG.error("Unexpected execution exception during bulk load", e1);
500 throw new IllegalStateException(t);
501 } catch (InterruptedException e1) {
502 LOG.error("Unexpected interrupted exception during bulk load", e1);
503 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
504 }
505 }
506 }
507
508 private boolean checkHFilesCountPerRegionPerFamily(
509 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
510 for (Entry<ByteBuffer,
511 ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
512 final Collection<LoadQueueItem> lqis = e.getValue();
513 HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
514 for (LoadQueueItem lqi: lqis) {
515 MutableInt count = filesMap.get(lqi.family);
516 if (count == null) {
517 count = new MutableInt();
518 filesMap.put(lqi.family, count);
519 }
520 count.increment();
521 if (count.intValue() > maxFilesPerRegionPerFamily) {
522 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
523 + " hfiles to family " + Bytes.toStringBinary(lqi.family)
524 + " of region with start key "
525 + Bytes.toStringBinary(e.getKey()));
526 return false;
527 }
528 }
529 }
530 return true;
531 }
532
533
534
535
536
537 private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table,
538 ExecutorService pool, Deque<LoadQueueItem> queue,
539 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
540
541
542 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
543 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
544
545
546 Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
547 while (!queue.isEmpty()) {
548 final LoadQueueItem item = queue.remove();
549
550 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
551 @Override
552 public List<LoadQueueItem> call() throws Exception {
553 List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
554 return splits;
555 }
556 };
557 splittingFutures.add(pool.submit(call));
558 }
559
560
561 for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
562 try {
563 List<LoadQueueItem> splits = lqis.get();
564 if (splits != null) {
565 queue.addAll(splits);
566 }
567 } catch (ExecutionException e1) {
568 Throwable t = e1.getCause();
569 if (t instanceof IOException) {
570 LOG.error("IOException during splitting", e1);
571 throw (IOException)t;
572 }
573 LOG.error("Unexpected execution exception during splitting", e1);
574 throw new IllegalStateException(t);
575 } catch (InterruptedException e1) {
576 LOG.error("Unexpected interrupted exception during splitting", e1);
577 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
578 }
579 }
580 return regionGroups;
581 }
582
583
584 private String getUniqueName() {
585 return UUID.randomUUID().toString().replaceAll("-", "");
586 }
587
588 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
589 final Table table, byte[] startKey,
590 byte[] splitKey) throws IOException {
591 final Path hfilePath = item.hfilePath;
592
593
594
595 final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
596
597 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
598 "region. Splitting...");
599
600 String uniqueName = getUniqueName();
601 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
602 Path botOut = new Path(tmpDir, uniqueName + ".bottom");
603 Path topOut = new Path(tmpDir, uniqueName + ".top");
604 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
605 botOut, topOut);
606
607 FileSystem fs = tmpDir.getFileSystem(getConf());
608 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
609 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
610 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
611
612
613
614 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
615 lqis.add(new LoadQueueItem(item.family, botOut));
616 lqis.add(new LoadQueueItem(item.family, topOut));
617
618 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
619 return lqis;
620 }
621
622
623
624
625
626
627
628
629
630
631 protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
632 final LoadQueueItem item, final Table table,
633 final Pair<byte[][], byte[][]> startEndKeys)
634 throws IOException {
635 final Path hfilePath = item.hfilePath;
636 HFile.Reader hfr = HFile.createReader(fs, hfilePath,
637 new CacheConfig(getConf()), getConf());
638 final byte[] first, last;
639 try {
640 hfr.loadFileInfo();
641 first = hfr.getFirstRowKey();
642 last = hfr.getLastRowKey();
643 } finally {
644 hfr.close();
645 }
646
647 LOG.info("Trying to load hfile=" + hfilePath +
648 " first=" + Bytes.toStringBinary(first) +
649 " last=" + Bytes.toStringBinary(last));
650 if (first == null || last == null) {
651 assert first == null && last == null;
652
653 LOG.info("hfile " + hfilePath + " has no entries, skipping");
654 return null;
655 }
656 if (Bytes.compareTo(first, last) > 0) {
657 throw new IllegalArgumentException(
658 "Invalid range: " + Bytes.toStringBinary(first) +
659 " > " + Bytes.toStringBinary(last));
660 }
661 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
662 Bytes.BYTES_COMPARATOR);
663 if (idx < 0) {
664
665
666 idx = -(idx + 1) - 1;
667 }
668 final int indexForCallable = idx;
669
670
671
672
673
674
675 if (indexForCallable < 0) {
676 throw new IOException("The first region info for table "
677 + table.getName()
678 + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
679 } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
680 && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
681 throw new IOException("The last region info for table "
682 + table.getName()
683 + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
684 } else if (indexForCallable + 1 < startEndKeys.getFirst().length
685 && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
686 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
687 throw new IOException("The endkey of one region for table "
688 + table.getName()
689 + " is not equal to the startkey of the next region in hbase:meta."
690 + "Please use hbck tool to fix it first.");
691 }
692
693 boolean lastKeyInRange =
694 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
695 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
696 if (!lastKeyInRange) {
697 List<LoadQueueItem> lqis = splitStoreFile(item, table,
698 startEndKeys.getFirst()[indexForCallable],
699 startEndKeys.getSecond()[indexForCallable]);
700 return lqis;
701 }
702
703
704 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
705 return null;
706 }
707
708
709
710
711
712
713
714 @Deprecated
715 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
716 final byte [] tableName, final byte[] first, Collection<LoadQueueItem> lqis)
717 throws IOException {
718 return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis);
719 }
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
735 final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
736 throws IOException {
737 final List<Pair<byte[], String>> famPaths =
738 new ArrayList<Pair<byte[], String>>(lqis.size());
739 for (LoadQueueItem lqi : lqis) {
740 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
741 }
742
743 final RegionServerCallable<Boolean> svrCallable =
744 new RegionServerCallable<Boolean>(conn, tableName, first) {
745 @Override
746 public Boolean call(int callTimeout) throws Exception {
747 SecureBulkLoadClient secureClient = null;
748 boolean success = false;
749
750 try {
751 LOG.debug("Going to connect to server " + getLocation() + " for row "
752 + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
753 byte[] regionName = getLocation().getRegionInfo().getRegionName();
754 if (!isSecureBulkLoadEndpointAvailable()) {
755 success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
756 } else {
757 try (Table table = conn.getTable(getTableName())) {
758 secureClient = new SecureBulkLoadClient(table);
759 success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
760 bulkToken, getLocation().getRegionInfo().getStartKey());
761 }
762 }
763 return success;
764 } finally {
765
766
767
768 if(secureClient != null && !success) {
769 FileSystem targetFs = FileSystem.get(getConf());
770
771
772
773 if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
774 for(Pair<byte[], String> el : famPaths) {
775 Path hfileStagingPath = null;
776 Path hfileOrigPath = new Path(el.getSecond());
777 try {
778 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
779 hfileOrigPath.getName());
780 if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
781 LOG.debug("Moved back file " + hfileOrigPath + " from " +
782 hfileStagingPath);
783 } else if(targetFs.exists(hfileStagingPath)){
784 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
785 hfileStagingPath);
786 }
787 } catch(Exception ex) {
788 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
789 hfileStagingPath, ex);
790 }
791 }
792 }
793 }
794 }
795 }
796 };
797
798 try {
799 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
800 Configuration conf = getConf();
801 boolean success = RpcRetryingCallerFactory.instantiate(conf,
802 null).<Boolean> newCaller()
803 .callWithRetries(svrCallable, Integer.MAX_VALUE);
804 if (!success) {
805 LOG.warn("Attempt to bulk load region containing "
806 + Bytes.toStringBinary(first) + " into table "
807 + tableName + " with files " + lqis
808 + " failed. This is recoverable and they will be retried.");
809 toRetry.addAll(lqis);
810 }
811
812 return toRetry;
813 } catch (IOException e) {
814 LOG.error("Encountered unrecoverable error from region server, additional details: "
815 + svrCallable.getExceptionMessageAdditionalDetail(), e);
816 throw e;
817 }
818 }
819
820 private boolean isSecureBulkLoadEndpointAvailable() {
821 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
822 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
823 }
824
825
826
827
828
829 static void splitStoreFile(
830 Configuration conf, Path inFile,
831 HColumnDescriptor familyDesc, byte[] splitKey,
832 Path bottomOut, Path topOut) throws IOException
833 {
834
835 Reference topReference = Reference.createTopReference(splitKey);
836 Reference bottomReference = Reference.createBottomReference(splitKey);
837
838 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
839 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
840 }
841
842
843
844
845 private static void copyHFileHalf(
846 Configuration conf, Path inFile, Path outFile, Reference reference,
847 HColumnDescriptor familyDescriptor)
848 throws IOException {
849 FileSystem fs = inFile.getFileSystem(conf);
850 CacheConfig cacheConf = new CacheConfig(conf);
851 HalfStoreFileReader halfReader = null;
852 StoreFile.Writer halfWriter = null;
853 try {
854 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
855 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
856
857 int blocksize = familyDescriptor.getBlocksize();
858 Algorithm compression = familyDescriptor.getCompression();
859 BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
860 HFileContext hFileContext = new HFileContextBuilder()
861 .withCompression(compression)
862 .withChecksumType(HStore.getChecksumType(conf))
863 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
864 .withBlockSize(blocksize)
865 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
866 .withIncludesTags(true)
867 .build();
868 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
869 fs)
870 .withFilePath(outFile)
871 .withBloomType(bloomFilterType)
872 .withFileContext(hFileContext)
873 .build();
874 HFileScanner scanner = halfReader.getScanner(false, false, false);
875 scanner.seekTo();
876 do {
877 KeyValue kv = KeyValueUtil.ensureKeyValue(scanner.getKeyValue());
878 halfWriter.append(kv);
879 } while (scanner.next());
880
881 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
882 if (shouldCopyHFileMetaKey(entry.getKey())) {
883 halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
884 }
885 }
886 } finally {
887 if (halfWriter != null) halfWriter.close();
888 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
889 }
890 }
891
892 private static boolean shouldCopyHFileMetaKey(byte[] key) {
893
894 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
895 return false;
896 }
897
898 return !HFile.isReservedFileInfoKey(key);
899 }
900
901 private boolean doesTableExist(TableName tableName) throws Exception {
902 return hbAdmin.tableExists(tableName);
903 }
904
905
906
907
908
909
910
911
912
913
914
915
916
917 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
918 ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
919 int runningValue = 0;
920 byte[] currStartKey = null;
921 boolean firstBoundary = true;
922
923 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
924 if (runningValue == 0) currStartKey = item.getKey();
925 runningValue += item.getValue();
926 if (runningValue == 0) {
927 if (!firstBoundary) keysArray.add(currStartKey);
928 firstBoundary = false;
929 }
930 }
931
932 return keysArray.toArray(new byte[0][0]);
933 }
934
935
936
937
938
939 private void createTable(TableName tableName, String dirPath) throws Exception {
940 final Path hfofDir = new Path(dirPath);
941 final FileSystem fs = hfofDir.getFileSystem(getConf());
942
943
944
945 final HTableDescriptor htd = new HTableDescriptor(tableName);
946 final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
947 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
948 @Override
949 public HColumnDescriptor bulkFamily(final byte[] familyName) {
950 HColumnDescriptor hcd = new HColumnDescriptor(familyName);
951 htd.addFamily(hcd);
952 return hcd;
953 }
954 @Override
955 public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
956 throws IOException {
957 Path hfile = hfileStatus.getPath();
958 HFile.Reader reader = HFile.createReader(fs, hfile,
959 new CacheConfig(getConf()), getConf());
960 try {
961 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
962 hcd.setCompressionType(reader.getFileContext().getCompression());
963 LOG.info("Setting compression " + hcd.getCompressionType().name() +
964 " for family " + hcd.toString());
965 }
966 reader.loadFileInfo();
967 byte[] first = reader.getFirstRowKey();
968 byte[] last = reader.getLastRowKey();
969
970 LOG.info("Trying to figure out region boundaries hfile=" + hfile +
971 " first=" + Bytes.toStringBinary(first) +
972 " last=" + Bytes.toStringBinary(last));
973
974
975 Integer value = map.containsKey(first)? map.get(first):0;
976 map.put(first, value+1);
977
978 value = map.containsKey(last)? map.get(last):0;
979 map.put(last, value-1);
980 } finally {
981 reader.close();
982 }
983 }
984 });
985
986 byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
987 this.hbAdmin.createTable(htd,keys);
988
989 LOG.info("Table "+ tableName +" is available!!");
990 }
991
992 @Override
993 public int run(String[] args) throws Exception {
994 if (args.length != 2) {
995 usage();
996 return -1;
997 }
998
999 initialize();
1000
1001 String dirPath = args[0];
1002 TableName tableName = TableName.valueOf(args[1]);
1003
1004 boolean tableExists = this.doesTableExist(tableName);
1005 if (!tableExists) {
1006 if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
1007 this.createTable(tableName, dirPath);
1008 } else {
1009 String errorMsg = format("Table '%s' does not exist.", tableName);
1010 LOG.error(errorMsg);
1011 throw new TableNotFoundException(errorMsg);
1012 }
1013 }
1014
1015 Path hfofDir = new Path(dirPath);
1016
1017 try (Connection connection = ConnectionFactory.createConnection(getConf());
1018 HTable table = (HTable) connection.getTable(tableName);) {
1019 doBulkLoad(hfofDir, table);
1020 }
1021 return 0;
1022 }
1023
1024 public static void main(String[] args) throws Exception {
1025 Configuration conf = HBaseConfiguration.create();
1026 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);
1027 System.exit(ret);
1028 }
1029
1030 }