001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import edu.umd.cs.findbugs.annotations.Nullable; 022import java.io.EOFException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.Arrays; 026import java.util.Comparator; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.TreeMap; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038import org.apache.commons.lang3.NotImplementedException; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FSDataInputStream; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.fs.FileAlreadyExistsException; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.PathFilter; 047import org.apache.hadoop.hbase.Coprocessor; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.TableDescriptors; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 053import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 057import org.apache.hadoop.hbase.exceptions.DeserializationException; 058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 059import org.apache.hadoop.hbase.regionserver.BloomType; 060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 061import org.apache.yetus.audience.InterfaceAudience; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 067 068/** 069 * Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It 070 * expects descriptors to be in a file in the {@link #TABLEINFO_DIR} subdir of the table's directory 071 * in FS. Can be read-only -- i.e. does not modify the filesystem or can be read and write. 072 * <p> 073 * Also has utility for keeping up the table descriptors tableinfo file. The table schema file is 074 * kept in the {@link #TABLEINFO_DIR} subdir of the table directory in the filesystem. It has a 075 * {@link #TABLEINFO_FILE_PREFIX} and then a suffix that is the edit sequenceid: e.g. 076 * <code>.tableinfo.0000000003</code>. This sequenceid is always increasing. It starts at zero. The 077 * table schema file with the highest sequenceid has the most recent schema edit. Usually there is 078 * one file only, the most recent but there may be short periods where there are more than one file. 079 * Old files are eventually cleaned. Presumption is that there will not be lots of concurrent 080 * clients making table schema edits. If so, the below needs a bit of a reworking and perhaps some 081 * supporting api in hdfs. 082 */ 083@InterfaceAudience.Private 084public class FSTableDescriptors implements TableDescriptors { 085 private static final Logger LOG = LoggerFactory.getLogger(FSTableDescriptors.class); 086 private final FileSystem fs; 087 private final Path rootdir; 088 private final boolean fsreadonly; 089 private final boolean usecache; 090 private volatile boolean fsvisited; 091 private boolean tableDescriptorParallelLoadEnable = false; 092 private ThreadPoolExecutor executor; 093 094 long cachehits = 0; 095 long invocations = 0; 096 097 /** 098 * The file name prefix used to store HTD in HDFS 099 */ 100 static final String TABLEINFO_FILE_PREFIX = ".tableinfo"; 101 public static final String TABLEINFO_DIR = ".tabledesc"; 102 103 // This cache does not age out the old stuff. Thinking is that the amount 104 // of data we keep up in here is so small, no need to do occasional purge. 105 // TODO. 106 private final Map<TableName, TableDescriptor> cache = new ConcurrentHashMap<>(); 107 108 /** 109 * Construct a FSTableDescriptors instance using the hbase root dir of the given conf and the 110 * filesystem where that root dir lives. This instance can do write operations (is not read only). 111 */ 112 public FSTableDescriptors(final Configuration conf) throws IOException { 113 this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf)); 114 } 115 116 public FSTableDescriptors(final FileSystem fs, final Path rootdir) { 117 this(fs, rootdir, false, true); 118 } 119 120 public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly, 121 final boolean usecache) { 122 this(fs, rootdir, fsreadonly, usecache, 0); 123 } 124 125 public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly, 126 final boolean usecache, final int tableDescriptorParallelLoadThreads) { 127 this.fs = fs; 128 this.rootdir = rootdir; 129 this.fsreadonly = fsreadonly; 130 this.usecache = usecache; 131 if (tableDescriptorParallelLoadThreads > 0) { 132 tableDescriptorParallelLoadEnable = true; 133 executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads, 134 tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), 135 new ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true) 136 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 137 executor.allowCoreThreadTimeOut(true); 138 } 139 } 140 141 public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException { 142 tryUpdateAndGetMetaTableDescriptor(conf, CommonFSUtils.getCurrentFileSystem(conf), 143 CommonFSUtils.getRootDir(conf)); 144 } 145 146 public static TableDescriptor tryUpdateAndGetMetaTableDescriptor(Configuration conf, 147 FileSystem fs, Path rootdir) throws IOException { 148 // see if we already have meta descriptor on fs. Write one if not. 149 Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs, 150 CommonFSUtils.getTableDir(rootdir, TableName.META_TABLE_NAME), false); 151 if (opt.isPresent()) { 152 return opt.get().getSecond(); 153 } 154 TableDescriptorBuilder builder = createMetaTableDescriptorBuilder(conf); 155 TableDescriptor td = StoreFileTrackerFactory.updateWithTrackerConfigs(conf, builder.build()); 156 LOG.info("Creating new hbase:meta table descriptor {}", td); 157 TableName tableName = td.getTableName(); 158 Path tableDir = CommonFSUtils.getTableDir(rootdir, tableName); 159 Path p = writeTableDescriptor(fs, td, tableDir, null); 160 if (p == null) { 161 throw new IOException("Failed update hbase:meta table descriptor"); 162 } 163 LOG.info("Updated hbase:meta table descriptor to {}", p); 164 return td; 165 } 166 167 public static ColumnFamilyDescriptor getTableFamilyDescForMeta(final Configuration conf) { 168 return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY) 169 .setMaxVersions( 170 conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS)) 171 .setInMemory(true).setBlocksize(8 * 1024).setScope(HConstants.REPLICATION_SCOPE_LOCAL) 172 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL) 173 .build(); 174 } 175 176 public static ColumnFamilyDescriptor getReplBarrierFamilyDescForMeta() { 177 return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY) 178 .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true) 179 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 180 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL) 181 .build(); 182 } 183 184 public static ColumnFamilyDescriptor getNamespaceFamilyDescForMeta(Configuration conf) { 185 return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.NAMESPACE_FAMILY) 186 .setMaxVersions( 187 conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS)) 188 .setInMemory(true) 189 .setBlocksize( 190 conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) 191 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 192 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL) 193 .build(); 194 } 195 196 private static TableDescriptorBuilder createMetaTableDescriptorBuilder(final Configuration conf) 197 throws IOException { 198 // TODO We used to set CacheDataInL1 for META table. When we have BucketCache in file mode, now 199 // the META table data goes to File mode BC only. Test how that affect the system. If too much, 200 // we have to rethink about adding back the setCacheDataInL1 for META table CFs. 201 return TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME) 202 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY) 203 .setMaxVersions( 204 conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS)) 205 .setInMemory(true) 206 .setBlocksize( 207 conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) 208 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROWCOL) 209 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build()) 210 .setColumnFamily(getTableFamilyDescForMeta(conf)) 211 .setColumnFamily(getReplBarrierFamilyDescForMeta()) 212 .setColumnFamily(getNamespaceFamilyDescForMeta(conf)).setCoprocessor( 213 CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName()) 214 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()); 215 } 216 217 protected boolean isUsecache() { 218 return this.usecache; 219 } 220 221 /** 222 * Get the current table descriptor for the given table, or null if none exists. 223 * <p/> 224 * Uses a local cache of the descriptor but still checks the filesystem on each call if 225 * {@link #fsvisited} is not {@code true}, i.e, we haven't done a full scan yet, to see if a newer 226 * file has been created since the cached one was read. 227 */ 228 @Override 229 @Nullable 230 public TableDescriptor get(TableName tableName) { 231 invocations++; 232 if (usecache) { 233 // Look in cache of descriptors. 234 TableDescriptor cachedtdm = this.cache.get(tableName); 235 if (cachedtdm != null) { 236 cachehits++; 237 return cachedtdm; 238 } 239 // we do not need to go to fs any more 240 if (fsvisited) { 241 return null; 242 } 243 } 244 TableDescriptor tdmt = null; 245 try { 246 tdmt = getTableDescriptorFromFs(fs, getTableDir(tableName), fsreadonly).map(Pair::getSecond) 247 .orElse(null); 248 } catch (IOException ioe) { 249 LOG.debug("Exception during readTableDecriptor. Current table name = " + tableName, ioe); 250 } 251 // last HTD written wins 252 if (usecache && tdmt != null) { 253 this.cache.put(tableName, tdmt); 254 } 255 256 return tdmt; 257 } 258 259 /** 260 * Returns a map from table name to table descriptor for all tables. 261 */ 262 @Override 263 public Map<String, TableDescriptor> getAll() throws IOException { 264 Map<String, TableDescriptor> tds = new ConcurrentSkipListMap<>(); 265 if (fsvisited) { 266 for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) { 267 tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue()); 268 } 269 } else { 270 LOG.info("Fetching table descriptors from the filesystem."); 271 final long startTime = EnvironmentEdgeManager.currentTime(); 272 AtomicBoolean allvisited = new AtomicBoolean(usecache); 273 List<Path> tableDirs = FSUtils.getTableDirs(fs, rootdir); 274 if (!tableDescriptorParallelLoadEnable) { 275 for (Path dir : tableDirs) { 276 internalGet(dir, tds, allvisited); 277 } 278 } else { 279 CountDownLatch latch = new CountDownLatch(tableDirs.size()); 280 for (Path dir : tableDirs) { 281 executor.submit(new Runnable() { 282 @Override 283 public void run() { 284 try { 285 internalGet(dir, tds, allvisited); 286 } finally { 287 latch.countDown(); 288 } 289 } 290 }); 291 } 292 try { 293 latch.await(); 294 } catch (InterruptedException ie) { 295 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 296 } 297 } 298 fsvisited = allvisited.get(); 299 LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost " 300 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms."); 301 } 302 return tds; 303 } 304 305 private void internalGet(Path dir, Map<String, TableDescriptor> tds, AtomicBoolean allvisited) { 306 TableDescriptor htd = get(CommonFSUtils.getTableName(dir)); 307 if (htd == null) { 308 allvisited.set(false); 309 } else { 310 tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd); 311 } 312 } 313 314 /** 315 * Find descriptors by namespace. 316 * @see #get(org.apache.hadoop.hbase.TableName) 317 */ 318 @Override 319 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 320 Map<String, TableDescriptor> htds = new TreeMap<>(); 321 List<Path> tableDirs = 322 FSUtils.getLocalTableDirs(fs, CommonFSUtils.getNamespaceDir(rootdir, name)); 323 for (Path d : tableDirs) { 324 TableDescriptor htd = get(CommonFSUtils.getTableName(d)); 325 if (htd == null) { 326 continue; 327 } 328 htds.put(CommonFSUtils.getTableName(d).getNameAsString(), htd); 329 } 330 return htds; 331 } 332 333 @Override 334 public void update(TableDescriptor td, boolean cacheOnly) throws IOException { 335 // TODO: in fact this method will only be called at master side, so fsreadonly and usecache will 336 // always be true. In general, we'd better have a ReadOnlyFSTableDesciptors for HRegionServer 337 // but now, HMaster extends HRegionServer, so unless making use of generic, we can not have 338 // different implementations for HMaster and HRegionServer. Revisit this when we make HMaster 339 // not extend HRegionServer in the future. 340 if (fsreadonly) { 341 throw new UnsupportedOperationException("Cannot add a table descriptor - in read only mode"); 342 } 343 if (!cacheOnly) { 344 updateTableDescriptor(td); 345 } 346 if (usecache) { 347 this.cache.put(td.getTableName(), td); 348 } 349 } 350 351 @RestrictedApi(explanation = "Should only be called in tests or self", link = "", 352 allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java") 353 Path updateTableDescriptor(TableDescriptor td) throws IOException { 354 TableName tableName = td.getTableName(); 355 Path tableDir = getTableDir(tableName); 356 Path p = writeTableDescriptor(fs, td, tableDir, 357 getTableDescriptorFromFs(fs, tableDir, fsreadonly).map(Pair::getFirst).orElse(null)); 358 if (p == null) { 359 throw new IOException("Failed update"); 360 } 361 LOG.info("Updated tableinfo=" + p); 362 return p; 363 } 364 365 /** 366 * Removes the table descriptor from the local cache and returns it. If not in read only mode, it 367 * also deletes the entire table directory(!) from the FileSystem. 368 */ 369 @Override 370 public TableDescriptor remove(final TableName tablename) throws IOException { 371 if (fsreadonly) { 372 throw new NotImplementedException("Cannot remove a table descriptor - in read only mode"); 373 } 374 Path tabledir = getTableDir(tablename); 375 if (this.fs.exists(tabledir)) { 376 if (!this.fs.delete(tabledir, true)) { 377 throw new IOException("Failed delete of " + tabledir.toString()); 378 } 379 } 380 TableDescriptor descriptor = this.cache.remove(tablename); 381 return descriptor; 382 } 383 384 /** 385 * Check whether we have a valid TableDescriptor. 386 */ 387 public static boolean isTableDir(FileSystem fs, Path tableDir) throws IOException { 388 return getTableDescriptorFromFs(fs, tableDir, true).isPresent(); 389 } 390 391 /** 392 * Compare {@link FileStatus} instances by {@link Path#getName()}. Returns in reverse order. 393 */ 394 static final Comparator<FileStatus> TABLEINFO_FILESTATUS_COMPARATOR = 395 new Comparator<FileStatus>() { 396 @Override 397 public int compare(FileStatus left, FileStatus right) { 398 return right.getPath().getName().compareTo(left.getPath().getName()); 399 } 400 }; 401 402 /** 403 * Return the table directory in HDFS 404 */ 405 private Path getTableDir(TableName tableName) { 406 return CommonFSUtils.getTableDir(rootdir, tableName); 407 } 408 409 private static final PathFilter TABLEINFO_PATHFILTER = new PathFilter() { 410 @Override 411 public boolean accept(Path p) { 412 // Accept any file that starts with TABLEINFO_NAME 413 return p.getName().startsWith(TABLEINFO_FILE_PREFIX); 414 } 415 }; 416 417 /** 418 * Width of the sequenceid that is a suffix on a tableinfo file. 419 */ 420 static final int WIDTH_OF_SEQUENCE_ID = 10; 421 422 /** 423 * @param number Number to use as suffix. 424 * @return Returns zero-prefixed decimal version of passed number (Does absolute in case number is 425 * negative). 426 */ 427 private static String formatTableInfoSequenceId(final int number) { 428 byte[] b = new byte[WIDTH_OF_SEQUENCE_ID]; 429 int d = Math.abs(number); 430 for (int i = b.length - 1; i >= 0; i--) { 431 b[i] = (byte) ((d % 10) + '0'); 432 d /= 10; 433 } 434 return Bytes.toString(b); 435 } 436 437 @Override 438 public void close() throws IOException { 439 // Close the executor when parallel loading enabled. 440 if (tableDescriptorParallelLoadEnable) { 441 this.executor.shutdown(); 442 } 443 } 444 445 static final class SequenceIdAndFileLength { 446 447 final int sequenceId; 448 449 final int fileLength; 450 451 SequenceIdAndFileLength(int sequenceId, int fileLength) { 452 this.sequenceId = sequenceId; 453 this.fileLength = fileLength; 454 } 455 } 456 457 /** 458 * Returns the current sequence id and file length or 0 if none found. 459 * @param p Path to a <code>.tableinfo</code> file. 460 */ 461 @RestrictedApi(explanation = "Should only be called in tests or self", link = "", 462 allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java") 463 static SequenceIdAndFileLength getTableInfoSequenceIdAndFileLength(Path p) { 464 String name = p.getName(); 465 if (!name.startsWith(TABLEINFO_FILE_PREFIX)) { 466 throw new IllegalArgumentException("Invalid table descriptor file name: " + name); 467 } 468 int firstDot = name.indexOf('.', TABLEINFO_FILE_PREFIX.length()); 469 if (firstDot < 0) { 470 // oldest style where we do not have both sequence id and file length 471 return new SequenceIdAndFileLength(0, 0); 472 } 473 int secondDot = name.indexOf('.', firstDot + 1); 474 if (secondDot < 0) { 475 // old stype where we do not have file length 476 int sequenceId = Integer.parseInt(name.substring(firstDot + 1)); 477 return new SequenceIdAndFileLength(sequenceId, 0); 478 } 479 int sequenceId = Integer.parseInt(name.substring(firstDot + 1, secondDot)); 480 int fileLength = Integer.parseInt(name.substring(secondDot + 1)); 481 return new SequenceIdAndFileLength(sequenceId, fileLength); 482 } 483 484 /** 485 * Returns Name of tableinfo file. 486 */ 487 @RestrictedApi(explanation = "Should only be called in tests or self", link = "", 488 allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java") 489 static String getTableInfoFileName(int sequenceId, byte[] content) { 490 return TABLEINFO_FILE_PREFIX + "." + formatTableInfoSequenceId(sequenceId) + "." 491 + content.length; 492 } 493 494 /** 495 * Returns the latest table descriptor for the given table directly from the file system if it 496 * exists, bypassing the local cache. Returns null if it's not found. 497 */ 498 public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path hbaseRootDir, 499 TableName tableName) throws IOException { 500 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 501 return getTableDescriptorFromFs(fs, tableDir); 502 } 503 504 /** 505 * Returns the latest table descriptor for the table located at the given directory directly from 506 * the file system if it exists. 507 */ 508 public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path tableDir) 509 throws IOException { 510 return getTableDescriptorFromFs(fs, tableDir, true).map(Pair::getSecond).orElse(null); 511 } 512 513 private static void deleteMalformedFile(FileSystem fs, Path file) throws IOException { 514 LOG.info("Delete malformed table descriptor file {}", file); 515 if (!fs.delete(file, false)) { 516 LOG.warn("Failed to delete malformed table descriptor file {}", file); 517 } 518 } 519 520 private static Optional<Pair<FileStatus, TableDescriptor>> getTableDescriptorFromFs(FileSystem fs, 521 Path tableDir, boolean readonly) throws IOException { 522 Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR); 523 FileStatus[] descFiles = CommonFSUtils.listStatus(fs, tableInfoDir, TABLEINFO_PATHFILTER); 524 if (descFiles == null || descFiles.length < 1) { 525 return Optional.empty(); 526 } 527 Arrays.sort(descFiles, TABLEINFO_FILESTATUS_COMPARATOR); 528 int i = 0; 529 TableDescriptor td = null; 530 FileStatus descFile = null; 531 for (; i < descFiles.length; i++) { 532 descFile = descFiles[i]; 533 Path file = descFile.getPath(); 534 // get file length from file name if present 535 int fileLength = getTableInfoSequenceIdAndFileLength(file).fileLength; 536 byte[] content = new byte[fileLength > 0 ? fileLength : Ints.checkedCast(descFile.getLen())]; 537 try (FSDataInputStream in = fs.open(file)) { 538 in.readFully(content); 539 } catch (EOFException e) { 540 LOG.info("Failed to load file {} due to EOF, it should be half written: {}", file, 541 e.toString()); 542 if (!readonly) { 543 deleteMalformedFile(fs, file); 544 } 545 continue; 546 } 547 try { 548 td = TableDescriptorBuilder.parseFrom(content); 549 break; 550 } catch (DeserializationException e) { 551 LOG.info("Failed to parse file {} due to malformed protobuf message: {}", file, 552 e.toString()); 553 if (!readonly) { 554 deleteMalformedFile(fs, file); 555 } 556 } 557 } 558 if (!readonly) { 559 // i + 1 to skip the one we load 560 for (i = i + 1; i < descFiles.length; i++) { 561 Path file = descFiles[i].getPath(); 562 LOG.info("Delete old table descriptor file {}", file); 563 if (!fs.delete(file, false)) { 564 LOG.info("Failed to delete old table descriptor file {}", file); 565 } 566 } 567 } 568 return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty(); 569 } 570 571 @RestrictedApi(explanation = "Should only be called in tests", link = "", 572 allowedOnPath = ".*/src/test/.*") 573 public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException { 574 Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR); 575 deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE); 576 } 577 578 /** 579 * Deletes files matching the table info file pattern within the given directory whose sequenceId 580 * is at most the given max sequenceId. 581 */ 582 private static void deleteTableDescriptorFiles(FileSystem fs, Path dir, int maxSequenceId) 583 throws IOException { 584 FileStatus[] status = CommonFSUtils.listStatus(fs, dir, TABLEINFO_PATHFILTER); 585 for (FileStatus file : status) { 586 Path path = file.getPath(); 587 int sequenceId = getTableInfoSequenceIdAndFileLength(path).sequenceId; 588 if (sequenceId <= maxSequenceId) { 589 boolean success = CommonFSUtils.delete(fs, path, false); 590 if (success) { 591 LOG.debug("Deleted {}", path); 592 } else { 593 LOG.error("Failed to delete table descriptor at {}", path); 594 } 595 } 596 } 597 } 598 599 /** 600 * Attempts to write a new table descriptor to the given table's directory. It begins at the 601 * currentSequenceId + 1 and tries 10 times to find a new sequence number not already in use. 602 * <p/> 603 * Removes the current descriptor file if passed in. 604 * @return Descriptor file or null if we failed write. 605 */ 606 private static Path writeTableDescriptor(final FileSystem fs, final TableDescriptor td, 607 final Path tableDir, final FileStatus currentDescriptorFile) throws IOException { 608 // Here we will write to the final directory directly to avoid renaming as on OSS renaming is 609 // not atomic and has performance issue. The reason why we could do this is that, in the below 610 // code we will not overwrite existing files, we will write a new file instead. And when 611 // loading, we will skip the half written file, please see the code in getTableDescriptorFromFs 612 Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR); 613 614 // In proc v2 we have table lock so typically, there will be no concurrent writes. Keep the 615 // retry logic here since we may still want to write the table descriptor from for example, 616 // HBCK2? 617 int currentSequenceId = currentDescriptorFile == null 618 ? 0 619 : getTableInfoSequenceIdAndFileLength(currentDescriptorFile.getPath()).sequenceId; 620 621 // Put arbitrary upperbound on how often we retry 622 int maxAttempts = 10; 623 int maxSequenceId = currentSequenceId + maxAttempts; 624 byte[] bytes = TableDescriptorBuilder.toByteArray(td); 625 for (int newSequenceId = currentSequenceId + 1; newSequenceId 626 <= maxSequenceId; newSequenceId++) { 627 String fileName = getTableInfoFileName(newSequenceId, bytes); 628 Path filePath = new Path(tableInfoDir, fileName); 629 try (FSDataOutputStream out = fs.create(filePath, false)) { 630 out.write(bytes); 631 } catch (FileAlreadyExistsException e) { 632 LOG.debug("{} exists; retrying up to {} times", filePath, maxAttempts, e); 633 continue; 634 } catch (IOException e) { 635 LOG.debug("Failed write {}; retrying up to {} times", filePath, maxAttempts, e); 636 continue; 637 } 638 deleteTableDescriptorFiles(fs, tableInfoDir, newSequenceId - 1); 639 return filePath; 640 } 641 return null; 642 } 643 644 /** 645 * Create new TableDescriptor in HDFS. Happens when we are creating table. Used by tests. 646 * @return True if we successfully created file. 647 */ 648 public boolean createTableDescriptor(TableDescriptor htd) throws IOException { 649 return createTableDescriptor(htd, false); 650 } 651 652 /** 653 * Create new TableDescriptor in HDFS. Happens when we are creating table. If forceCreation is 654 * true then even if previous table descriptor is present it will be overwritten 655 * @return True if we successfully created file. 656 */ 657 public boolean createTableDescriptor(TableDescriptor htd, boolean forceCreation) 658 throws IOException { 659 Path tableDir = getTableDir(htd.getTableName()); 660 return createTableDescriptorForTableDirectory(tableDir, htd, forceCreation); 661 } 662 663 /** 664 * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a 665 * new table during cluster start or in Clone and Create Table Procedures. Checks readOnly flag 666 * passed on construction. 667 * @param tableDir table directory under which we should write the file 668 * @param htd description of the table to write 669 * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will 670 * be overwritten 671 * @return <tt>true</tt> if the we successfully created the file, <tt>false</tt> if the file 672 * already exists and we weren't forcing the descriptor creation. 673 * @throws IOException if a filesystem error occurs 674 */ 675 public boolean createTableDescriptorForTableDirectory(Path tableDir, TableDescriptor htd, 676 boolean forceCreation) throws IOException { 677 if (this.fsreadonly) { 678 throw new NotImplementedException("Cannot create a table descriptor - in read only mode"); 679 } 680 return createTableDescriptorForTableDirectory(this.fs, tableDir, htd, forceCreation); 681 } 682 683 /** 684 * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a 685 * new table snapshoting. Does not enforce read-only. That is for caller to determine. 686 * @param fs Filesystem to use. 687 * @param tableDir table directory under which we should write the file 688 * @param htd description of the table to write 689 * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will 690 * be overwritten 691 * @return <tt>true</tt> if the we successfully created the file, <tt>false</tt> if the file 692 * already exists and we weren't forcing the descriptor creation. 693 * @throws IOException if a filesystem error occurs 694 */ 695 public static boolean createTableDescriptorForTableDirectory(FileSystem fs, Path tableDir, 696 TableDescriptor htd, boolean forceCreation) throws IOException { 697 Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs, tableDir, false); 698 if (opt.isPresent()) { 699 LOG.debug("Current path={}", opt.get().getFirst()); 700 if (!forceCreation) { 701 if (htd.equals(opt.get().getSecond())) { 702 LOG.trace("TableInfo already exists.. Skipping creation"); 703 return false; 704 } 705 } 706 } 707 return writeTableDescriptor(fs, htd, tableDir, opt.map(Pair::getFirst).orElse(null)) != null; 708 } 709}