001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import edu.umd.cs.findbugs.annotations.Nullable; 022import java.io.EOFException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.Arrays; 026import java.util.Comparator; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.TreeMap; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038import org.apache.commons.lang3.NotImplementedException; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FSDataInputStream; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.fs.FileAlreadyExistsException; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.PathFilter; 047import org.apache.hadoop.hbase.Coprocessor; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.TableDescriptors; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 053import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 057import org.apache.hadoop.hbase.exceptions.DeserializationException; 058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 059import org.apache.hadoop.hbase.regionserver.BloomType; 060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 061import org.apache.yetus.audience.InterfaceAudience; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 067 068/** 069 * Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It 070 * expects descriptors to be in a file in the {@link #TABLEINFO_DIR} subdir of the table's directory 071 * in FS. Can be read-only -- i.e. does not modify the filesystem or can be read and write. 072 * <p> 073 * Also has utility for keeping up the table descriptors tableinfo file. The table schema file is 074 * kept in the {@link #TABLEINFO_DIR} subdir of the table directory in the filesystem. It has a 075 * {@link #TABLEINFO_FILE_PREFIX} and then a suffix that is the edit sequenceid: e.g. 076 * <code>.tableinfo.0000000003</code>. This sequenceid is always increasing. It starts at zero. The 077 * table schema file with the highest sequenceid has the most recent schema edit. Usually there is 078 * one file only, the most recent but there may be short periods where there are more than one file. 079 * Old files are eventually cleaned. Presumption is that there will not be lots of concurrent 080 * clients making table schema edits. If so, the below needs a bit of a reworking and perhaps some 081 * supporting api in hdfs. 082 */ 083@InterfaceAudience.Private 084public class FSTableDescriptors implements TableDescriptors { 085 private static final Logger LOG = LoggerFactory.getLogger(FSTableDescriptors.class); 086 private final FileSystem fs; 087 private final Path rootdir; 088 private final boolean fsreadonly; 089 private final boolean usecache; 090 private volatile boolean fsvisited; 091 private boolean tableDescriptorParallelLoadEnable = false; 092 private ThreadPoolExecutor executor; 093 094 long cachehits = 0; 095 long invocations = 0; 096 097 /** 098 * The file name prefix used to store HTD in HDFS 099 */ 100 static final String TABLEINFO_FILE_PREFIX = ".tableinfo"; 101 public static final String TABLEINFO_DIR = ".tabledesc"; 102 103 // This cache does not age out the old stuff. Thinking is that the amount 104 // of data we keep up in here is so small, no need to do occasional purge. 105 // TODO. 106 private final Map<TableName, TableDescriptor> cache = new ConcurrentHashMap<>(); 107 108 /** 109 * Construct a FSTableDescriptors instance using the hbase root dir of the given conf and the 110 * filesystem where that root dir lives. This instance can do write operations (is not read only). 111 */ 112 public FSTableDescriptors(final Configuration conf) throws IOException { 113 this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf)); 114 } 115 116 public FSTableDescriptors(final FileSystem fs, final Path rootdir) { 117 this(fs, rootdir, false, true); 118 } 119 120 public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly, 121 final boolean usecache) { 122 this(fs, rootdir, fsreadonly, usecache, 0); 123 } 124 125 public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly, 126 final boolean usecache, final int tableDescriptorParallelLoadThreads) { 127 this.fs = fs; 128 this.rootdir = rootdir; 129 this.fsreadonly = fsreadonly; 130 this.usecache = usecache; 131 if (tableDescriptorParallelLoadThreads > 0) { 132 tableDescriptorParallelLoadEnable = true; 133 executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads, 134 tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), 135 new ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true) 136 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 137 executor.allowCoreThreadTimeOut(true); 138 } 139 } 140 141 public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException { 142 tryUpdateAndGetMetaTableDescriptor(conf, CommonFSUtils.getCurrentFileSystem(conf), 143 CommonFSUtils.getRootDir(conf)); 144 } 145 146 public static TableDescriptor tryUpdateAndGetMetaTableDescriptor(Configuration conf, 147 FileSystem fs, Path rootdir) throws IOException { 148 // see if we already have meta descriptor on fs. Write one if not. 149 Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs, 150 CommonFSUtils.getTableDir(rootdir, TableName.META_TABLE_NAME), false); 151 if (opt.isPresent()) { 152 return opt.get().getSecond(); 153 } 154 TableDescriptorBuilder builder = createMetaTableDescriptorBuilder(conf); 155 TableDescriptor td = StoreFileTrackerFactory.updateWithTrackerConfigs(conf, builder.build()); 156 LOG.info("Creating new {} table descriptor {}", TableName.META_TABLE_NAME, td); 157 TableName tableName = td.getTableName(); 158 Path tableDir = CommonFSUtils.getTableDir(rootdir, tableName); 159 Path p = writeTableDescriptor(fs, td, tableDir, null); 160 if (p == null) { 161 throw new IOException("Failed update " + TableName.META_TABLE_NAME + " table descriptor"); 162 } 163 LOG.info("Updated {} table descriptor to {}", TableName.META_TABLE_NAME, p); 164 return td; 165 } 166 167 public static ColumnFamilyDescriptor getTableFamilyDescForMeta(final Configuration conf) { 168 return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY) 169 .setMaxVersions( 170 conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS)) 171 .setInMemory(true).setBlocksize(8 * 1024).setScope(HConstants.REPLICATION_SCOPE_LOCAL) 172 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL) 173 .build(); 174 } 175 176 public static ColumnFamilyDescriptor getReplBarrierFamilyDescForMeta() { 177 return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY) 178 .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true) 179 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 180 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL) 181 .build(); 182 } 183 184 public static ColumnFamilyDescriptor getNamespaceFamilyDescForMeta(Configuration conf) { 185 return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.NAMESPACE_FAMILY) 186 .setMaxVersions( 187 conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS)) 188 .setInMemory(true) 189 .setBlocksize( 190 conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) 191 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 192 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL) 193 .build(); 194 } 195 196 private static TableDescriptorBuilder createMetaTableDescriptorBuilder(final Configuration conf) 197 throws IOException { 198 // TODO We used to set CacheDataInL1 for META table. When we have BucketCache in file mode, now 199 // the META table data goes to File mode BC only. Test how that affect the system. If too much, 200 // we have to rethink about adding back the setCacheDataInL1 for META table CFs. 201 return TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME) 202 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY) 203 .setMaxVersions( 204 conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS)) 205 .setInMemory(true) 206 .setBlocksize( 207 conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) 208 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROWCOL) 209 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build()) 210 .setColumnFamily(getTableFamilyDescForMeta(conf)) 211 .setColumnFamily(getReplBarrierFamilyDescForMeta()) 212 .setColumnFamily(getNamespaceFamilyDescForMeta(conf)).setCoprocessor( 213 CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName()) 214 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()); 215 } 216 217 protected boolean isUsecache() { 218 return this.usecache; 219 } 220 221 /** 222 * Get the current table descriptor for the given table, or null if none exists. 223 * <p/> 224 * Uses a local cache of the descriptor but still checks the filesystem on each call if 225 * {@link #fsvisited} is not {@code true}, i.e, we haven't done a full scan yet, to see if a newer 226 * file has been created since the cached one was read. 227 */ 228 @Override 229 @Nullable 230 public TableDescriptor get(TableName tableName) { 231 invocations++; 232 if (usecache) { 233 // Look in cache of descriptors. 234 TableDescriptor cachedtdm = this.cache.get(tableName); 235 if (cachedtdm != null) { 236 cachehits++; 237 return cachedtdm; 238 } 239 // we do not need to go to the fs anymore 240 if (fsvisited) { 241 return null; 242 } 243 } 244 TableDescriptor tdmt = null; 245 try { 246 tdmt = getTableDescriptorFromFs(fs, getTableDir(tableName), fsreadonly).map(Pair::getSecond) 247 .orElse(null); 248 } catch (IOException ioe) { 249 LOG.debug("Exception during readTableDecriptor. Current table name = " + tableName, ioe); 250 } 251 // last HTD written wins 252 if (usecache && tdmt != null) { 253 this.cache.put(tableName, tdmt); 254 } 255 256 return tdmt; 257 } 258 259 /** 260 * Returns a map from table name to table descriptor for all tables. 261 */ 262 @Override 263 public Map<String, TableDescriptor> getAll() throws IOException { 264 Map<String, TableDescriptor> tds = new ConcurrentSkipListMap<>(); 265 if (fsvisited) { 266 for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) { 267 tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue()); 268 } 269 } else { 270 LOG.info("Fetching table descriptors from the filesystem."); 271 final long startTime = EnvironmentEdgeManager.currentTime(); 272 AtomicBoolean allvisited = new AtomicBoolean(usecache); 273 List<Path> tableDirs = 274 FSUtils.getTableDirs(fs, rootdir).stream().filter(FSUtils::isLocalMetaTable).toList(); 275 if (!tableDescriptorParallelLoadEnable) { 276 for (Path dir : tableDirs) { 277 internalGet(dir, tds, allvisited); 278 } 279 } else { 280 CountDownLatch latch = new CountDownLatch(tableDirs.size()); 281 for (Path dir : tableDirs) { 282 executor.submit(new Runnable() { 283 @Override 284 public void run() { 285 try { 286 internalGet(dir, tds, allvisited); 287 } finally { 288 latch.countDown(); 289 } 290 } 291 }); 292 } 293 try { 294 latch.await(); 295 } catch (InterruptedException ie) { 296 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 297 } 298 } 299 fsvisited = allvisited.get(); 300 LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost " 301 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms."); 302 } 303 return tds; 304 } 305 306 private void internalGet(Path dir, Map<String, TableDescriptor> tds, AtomicBoolean allvisited) { 307 TableDescriptor htd = get(CommonFSUtils.getTableName(dir)); 308 if (htd == null) { 309 allvisited.set(false); 310 } else { 311 tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd); 312 } 313 } 314 315 /** 316 * Find descriptors by namespace. 317 * @see #get(org.apache.hadoop.hbase.TableName) 318 */ 319 @Override 320 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 321 Map<String, TableDescriptor> htds = new TreeMap<>(); 322 List<Path> tableDirs = 323 FSUtils.getLocalTableDirs(fs, CommonFSUtils.getNamespaceDir(rootdir, name)); 324 for (Path d : tableDirs) { 325 TableDescriptor htd = get(CommonFSUtils.getTableName(d)); 326 if (htd == null) { 327 continue; 328 } 329 htds.put(CommonFSUtils.getTableName(d).getNameAsString(), htd); 330 } 331 return htds; 332 } 333 334 @Override 335 public void update(TableDescriptor td, boolean cacheOnly) throws IOException { 336 // TODO: in fact this method will only be called at master side, so fsreadonly and usecache will 337 // always be true. In general, we'd better have a ReadOnlyFSTableDesciptors for HRegionServer 338 // but now, HMaster extends HRegionServer, so unless making use of generic, we can not have 339 // different implementations for HMaster and HRegionServer. Revisit this when we make HMaster 340 // not extend HRegionServer in the future. 341 if (fsreadonly) { 342 throw new UnsupportedOperationException("Cannot add a table descriptor - in read only mode"); 343 } 344 if (!cacheOnly) { 345 updateTableDescriptor(td); 346 } 347 if (usecache) { 348 this.cache.put(td.getTableName(), td); 349 } 350 } 351 352 @RestrictedApi(explanation = "Should only be called in tests or self", link = "", 353 allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java") 354 Path updateTableDescriptor(TableDescriptor td) throws IOException { 355 TableName tableName = td.getTableName(); 356 Path tableDir = getTableDir(tableName); 357 Path p = writeTableDescriptor(fs, td, tableDir, 358 getTableDescriptorFromFs(fs, tableDir, fsreadonly).map(Pair::getFirst).orElse(null)); 359 if (p == null) { 360 throw new IOException("Failed update"); 361 } 362 LOG.info("Updated tableinfo=" + p); 363 return p; 364 } 365 366 /** 367 * Removes the table descriptor from the local cache and returns it. If not in read only mode, it 368 * also deletes the entire table directory(!) from the FileSystem. 369 */ 370 @Override 371 public TableDescriptor remove(final TableName tablename) throws IOException { 372 if (fsreadonly) { 373 throw new NotImplementedException("Cannot remove a table descriptor - in read only mode"); 374 } 375 Path tabledir = getTableDir(tablename); 376 if (this.fs.exists(tabledir)) { 377 if (!this.fs.delete(tabledir, true)) { 378 throw new IOException("Failed delete of " + tabledir.toString()); 379 } 380 } 381 TableDescriptor descriptor = this.cache.remove(tablename); 382 return descriptor; 383 } 384 385 /** 386 * Check whether we have a valid TableDescriptor. 387 */ 388 public static boolean isTableDir(FileSystem fs, Path tableDir) throws IOException { 389 return getTableDescriptorFromFs(fs, tableDir, true).isPresent(); 390 } 391 392 /** 393 * Compare {@link FileStatus} instances by {@link Path#getName()}. Returns in reverse order. 394 */ 395 static final Comparator<FileStatus> TABLEINFO_FILESTATUS_COMPARATOR = 396 new Comparator<FileStatus>() { 397 @Override 398 public int compare(FileStatus left, FileStatus right) { 399 return right.getPath().getName().compareTo(left.getPath().getName()); 400 } 401 }; 402 403 /** 404 * Return the table directory in HDFS 405 */ 406 private Path getTableDir(TableName tableName) { 407 return CommonFSUtils.getTableDir(rootdir, tableName); 408 } 409 410 private static final PathFilter TABLEINFO_PATHFILTER = new PathFilter() { 411 @Override 412 public boolean accept(Path p) { 413 // Accept any file that starts with TABLEINFO_NAME 414 return p.getName().startsWith(TABLEINFO_FILE_PREFIX); 415 } 416 }; 417 418 /** 419 * Width of the sequenceid that is a suffix on a tableinfo file. 420 */ 421 static final int WIDTH_OF_SEQUENCE_ID = 10; 422 423 /** 424 * @param number Number to use as suffix. 425 * @return Returns zero-prefixed decimal version of passed number (Does absolute in case number is 426 * negative). 427 */ 428 private static String formatTableInfoSequenceId(final int number) { 429 byte[] b = new byte[WIDTH_OF_SEQUENCE_ID]; 430 int d = Math.abs(number); 431 for (int i = b.length - 1; i >= 0; i--) { 432 b[i] = (byte) ((d % 10) + '0'); 433 d /= 10; 434 } 435 return Bytes.toString(b); 436 } 437 438 @Override 439 public void close() throws IOException { 440 // Close the executor when parallel loading enabled. 441 if (tableDescriptorParallelLoadEnable) { 442 this.executor.shutdown(); 443 } 444 } 445 446 static final class SequenceIdAndFileLength { 447 448 final int sequenceId; 449 450 final int fileLength; 451 452 SequenceIdAndFileLength(int sequenceId, int fileLength) { 453 this.sequenceId = sequenceId; 454 this.fileLength = fileLength; 455 } 456 } 457 458 /** 459 * Returns the current sequence id and file length or 0 if none found. 460 * @param p Path to a <code>.tableinfo</code> file. 461 */ 462 @RestrictedApi(explanation = "Should only be called in tests or self", link = "", 463 allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java") 464 static SequenceIdAndFileLength getTableInfoSequenceIdAndFileLength(Path p) { 465 String name = p.getName(); 466 if (!name.startsWith(TABLEINFO_FILE_PREFIX)) { 467 throw new IllegalArgumentException("Invalid table descriptor file name: " + name); 468 } 469 int firstDot = name.indexOf('.', TABLEINFO_FILE_PREFIX.length()); 470 if (firstDot < 0) { 471 // oldest style where we do not have both sequence id and file length 472 return new SequenceIdAndFileLength(0, 0); 473 } 474 int secondDot = name.indexOf('.', firstDot + 1); 475 if (secondDot < 0) { 476 // old stype where we do not have file length 477 int sequenceId = Integer.parseInt(name.substring(firstDot + 1)); 478 return new SequenceIdAndFileLength(sequenceId, 0); 479 } 480 int sequenceId = Integer.parseInt(name.substring(firstDot + 1, secondDot)); 481 int fileLength = Integer.parseInt(name.substring(secondDot + 1)); 482 return new SequenceIdAndFileLength(sequenceId, fileLength); 483 } 484 485 /** 486 * Returns Name of tableinfo file. 487 */ 488 @RestrictedApi(explanation = "Should only be called in tests or self", link = "", 489 allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java") 490 static String getTableInfoFileName(int sequenceId, byte[] content) { 491 return TABLEINFO_FILE_PREFIX + "." + formatTableInfoSequenceId(sequenceId) + "." 492 + content.length; 493 } 494 495 /** 496 * Returns the latest table descriptor for the given table directly from the file system if it 497 * exists, bypassing the local cache. Returns null if it's not found. 498 */ 499 public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path hbaseRootDir, 500 TableName tableName) throws IOException { 501 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 502 return getTableDescriptorFromFs(fs, tableDir); 503 } 504 505 /** 506 * Returns the latest table descriptor for the table located at the given directory directly from 507 * the file system if it exists. 508 */ 509 public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path tableDir) 510 throws IOException { 511 return getTableDescriptorFromFs(fs, tableDir, true).map(Pair::getSecond).orElse(null); 512 } 513 514 private static void deleteMalformedFile(FileSystem fs, Path file) throws IOException { 515 LOG.info("Delete malformed table descriptor file {}", file); 516 if (!fs.delete(file, false)) { 517 LOG.warn("Failed to delete malformed table descriptor file {}", file); 518 } 519 } 520 521 private static Optional<Pair<FileStatus, TableDescriptor>> getTableDescriptorFromFs(FileSystem fs, 522 Path tableDir, boolean readonly) throws IOException { 523 Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR); 524 FileStatus[] descFiles = CommonFSUtils.listStatus(fs, tableInfoDir, TABLEINFO_PATHFILTER); 525 if (descFiles == null || descFiles.length < 1) { 526 return Optional.empty(); 527 } 528 Arrays.sort(descFiles, TABLEINFO_FILESTATUS_COMPARATOR); 529 int i = 0; 530 TableDescriptor td = null; 531 FileStatus descFile = null; 532 for (; i < descFiles.length; i++) { 533 descFile = descFiles[i]; 534 Path file = descFile.getPath(); 535 // get file length from file name if present 536 int fileLength = getTableInfoSequenceIdAndFileLength(file).fileLength; 537 byte[] content = new byte[fileLength > 0 ? fileLength : Ints.checkedCast(descFile.getLen())]; 538 try (FSDataInputStream in = fs.open(file)) { 539 in.readFully(content); 540 } catch (EOFException e) { 541 LOG.info("Failed to load file {} due to EOF, it should be half written: {}", file, 542 e.toString()); 543 if (!readonly) { 544 deleteMalformedFile(fs, file); 545 } 546 continue; 547 } 548 try { 549 td = TableDescriptorBuilder.parseFrom(content); 550 break; 551 } catch (DeserializationException e) { 552 LOG.info("Failed to parse file {} due to malformed protobuf message: {}", file, 553 e.toString()); 554 if (!readonly) { 555 deleteMalformedFile(fs, file); 556 } 557 } 558 } 559 if (!readonly) { 560 // i + 1 to skip the one we load 561 for (i = i + 1; i < descFiles.length; i++) { 562 Path file = descFiles[i].getPath(); 563 LOG.info("Delete old table descriptor file {}", file); 564 if (!fs.delete(file, false)) { 565 LOG.info("Failed to delete old table descriptor file {}", file); 566 } 567 } 568 } 569 return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty(); 570 } 571 572 @RestrictedApi(explanation = "Should only be called in tests", link = "", 573 allowedOnPath = ".*/src/test/.*") 574 public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException { 575 Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR); 576 deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE); 577 } 578 579 /** 580 * Deletes files matching the table info file pattern within the given directory whose sequenceId 581 * is at most the given max sequenceId. 582 */ 583 private static void deleteTableDescriptorFiles(FileSystem fs, Path dir, int maxSequenceId) 584 throws IOException { 585 FileStatus[] status = CommonFSUtils.listStatus(fs, dir, TABLEINFO_PATHFILTER); 586 for (FileStatus file : status) { 587 Path path = file.getPath(); 588 int sequenceId = getTableInfoSequenceIdAndFileLength(path).sequenceId; 589 if (sequenceId <= maxSequenceId) { 590 boolean success = CommonFSUtils.delete(fs, path, false); 591 if (success) { 592 LOG.debug("Deleted {}", path); 593 } else { 594 LOG.error("Failed to delete table descriptor at {}", path); 595 } 596 } 597 } 598 } 599 600 /** 601 * Attempts to write a new table descriptor to the given table's directory. It begins at the 602 * currentSequenceId + 1 and tries 10 times to find a new sequence number not already in use. 603 * <p/> 604 * Removes the current descriptor file if passed in. 605 * @return Descriptor file or null if we failed write. 606 */ 607 private static Path writeTableDescriptor(final FileSystem fs, final TableDescriptor td, 608 final Path tableDir, final FileStatus currentDescriptorFile) throws IOException { 609 // Here we will write to the final directory directly to avoid renaming as on OSS renaming is 610 // not atomic and has performance issue. The reason why we could do this is that, in the below 611 // code we will not overwrite existing files, we will write a new file instead. And when 612 // loading, we will skip the half written file, please see the code in getTableDescriptorFromFs 613 Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR); 614 615 // In proc v2 we have table lock so typically, there will be no concurrent writes. Keep the 616 // retry logic here since we may still want to write the table descriptor from for example, 617 // HBCK2? 618 int currentSequenceId = currentDescriptorFile == null 619 ? 0 620 : getTableInfoSequenceIdAndFileLength(currentDescriptorFile.getPath()).sequenceId; 621 622 // Put arbitrary upperbound on how often we retry 623 int maxAttempts = 10; 624 int maxSequenceId = currentSequenceId + maxAttempts; 625 byte[] bytes = TableDescriptorBuilder.toByteArray(td); 626 for (int newSequenceId = currentSequenceId + 1; newSequenceId 627 <= maxSequenceId; newSequenceId++) { 628 String fileName = getTableInfoFileName(newSequenceId, bytes); 629 Path filePath = new Path(tableInfoDir, fileName); 630 try (FSDataOutputStream out = fs.create(filePath, false)) { 631 out.write(bytes); 632 } catch (FileAlreadyExistsException e) { 633 LOG.debug("{} exists; retrying up to {} times", filePath, maxAttempts, e); 634 continue; 635 } catch (IOException e) { 636 LOG.debug("Failed write {}; retrying up to {} times", filePath, maxAttempts, e); 637 continue; 638 } 639 deleteTableDescriptorFiles(fs, tableInfoDir, newSequenceId - 1); 640 return filePath; 641 } 642 return null; 643 } 644 645 /** 646 * Create new TableDescriptor in HDFS. Happens when we are creating table. Used by tests. 647 * @return True if we successfully created file. 648 */ 649 public boolean createTableDescriptor(TableDescriptor htd) throws IOException { 650 return createTableDescriptor(htd, false); 651 } 652 653 /** 654 * Create new TableDescriptor in HDFS. Happens when we are creating table. If forceCreation is 655 * true then even if previous table descriptor is present it will be overwritten 656 * @return True if we successfully created file. 657 */ 658 public boolean createTableDescriptor(TableDescriptor htd, boolean forceCreation) 659 throws IOException { 660 Path tableDir = getTableDir(htd.getTableName()); 661 return createTableDescriptorForTableDirectory(tableDir, htd, forceCreation); 662 } 663 664 /** 665 * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a 666 * new table during cluster start or in Clone and Create Table Procedures. Checks readOnly flag 667 * passed on construction. 668 * @param tableDir table directory under which we should write the file 669 * @param htd description of the table to write 670 * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will 671 * be overwritten 672 * @return <tt>true</tt> if we successfully created the file, <tt>false</tt> if the file already 673 * exists, and we weren't forcing the descriptor creation. 674 * @throws IOException if a filesystem error occurs 675 */ 676 public boolean createTableDescriptorForTableDirectory(Path tableDir, TableDescriptor htd, 677 boolean forceCreation) throws IOException { 678 if (this.fsreadonly) { 679 throw new NotImplementedException("Cannot create a table descriptor - in read only mode"); 680 } 681 return createTableDescriptorForTableDirectory(this.fs, tableDir, htd, forceCreation); 682 } 683 684 /** 685 * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a 686 * new table snapshoting. Does not enforce read-only. That is for caller to determine. 687 * @param fs Filesystem to use. 688 * @param tableDir table directory under which we should write the file 689 * @param htd description of the table to write 690 * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will 691 * be overwritten 692 * @return <tt>true</tt> if we successfully created the file, <tt>false</tt> if the file already 693 * exists, and we weren't forcing the descriptor creation. 694 * @throws IOException if a filesystem error occurs 695 */ 696 public static boolean createTableDescriptorForTableDirectory(FileSystem fs, Path tableDir, 697 TableDescriptor htd, boolean forceCreation) throws IOException { 698 Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs, tableDir, false); 699 if (opt.isPresent()) { 700 LOG.debug("Current path={}", opt.get().getFirst()); 701 if (!forceCreation) { 702 if (htd.equals(opt.get().getSecond())) { 703 LOG.trace("TableInfo already exists.. Skipping creation"); 704 return false; 705 } 706 } 707 } 708 return writeTableDescriptor(fs, htd, tableDir, opt.map(Pair::getFirst).orElse(null)) != null; 709 } 710 711 /** 712 * Invalidates the table descriptor cache. 713 */ 714 @Override 715 public void invalidateTableDescriptorCache() { 716 LOG.info("Invalidating table descriptor cache."); 717 this.fsvisited = false; 718 this.cache.clear(); 719 } 720}