001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import edu.umd.cs.findbugs.annotations.Nullable; 022import java.io.ByteArrayOutputStream; 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Iterator; 030import java.util.LinkedHashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.NavigableMap; 034import java.util.Objects; 035import java.util.Set; 036import java.util.SortedMap; 037import java.util.regex.Matcher; 038import java.util.regex.Pattern; 039import java.util.stream.Collectors; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.Cell.Type; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Consistency; 045import org.apache.hadoop.hbase.client.Delete; 046import org.apache.hadoop.hbase.client.Get; 047import org.apache.hadoop.hbase.client.Mutation; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionInfoBuilder; 051import org.apache.hadoop.hbase.client.RegionLocator; 052import org.apache.hadoop.hbase.client.RegionReplicaUtil; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.ResultScanner; 055import org.apache.hadoop.hbase.client.Scan; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.client.TableState; 058import org.apache.hadoop.hbase.client.coprocessor.Batch; 059import org.apache.hadoop.hbase.exceptions.DeserializationException; 060import org.apache.hadoop.hbase.filter.Filter; 061import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 062import org.apache.hadoop.hbase.filter.RowFilter; 063import org.apache.hadoop.hbase.filter.SubstringComparator; 064import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 065import org.apache.hadoop.hbase.ipc.ServerRpcController; 066import org.apache.hadoop.hbase.master.RegionState; 067import org.apache.hadoop.hbase.master.RegionState.State; 068import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 069import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; 070import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; 071import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; 072import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 075import org.apache.hadoop.hbase.util.ExceptionUtil; 076import org.apache.hadoop.hbase.util.Pair; 077import org.apache.hadoop.hbase.util.PairOfSameType; 078import org.apache.yetus.audience.InterfaceAudience; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 083import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 084 085/** 086 * <p> 087 * Read/write operations on <code>hbase:meta</code> region as well as assignment information stored 088 * to <code>hbase:meta</code>. 089 * </p> 090 * <p> 091 * Some of the methods of this class take ZooKeeperWatcher as a param. The only reason for this is 092 * when this class is used on client-side (e.g. HBaseAdmin), we want to use short-lived connection 093 * (opened before each operation, closed right after), while when used on HM or HRS (like in 094 * AssignmentManager) we want permanent connection. 095 * </p> 096 * <p> 097 * HBASE-10070 adds a replicaId to HRI, meaning more than one HRI can be defined for the same table 098 * range (table, startKey, endKey). For every range, there will be at least one HRI defined which is 099 * called default replica. 100 * </p> 101 * <p> 102 * <h2>Meta layout</h2> 103 * 104 * <pre> 105 * For each table there is single row named for the table with a 'table' column family. 106 * The column family currently has one column in it, the 'state' column: 107 * 108 * table:state => contains table state 109 * 110 * Then for each table range ('Region'), there is a single row, formatted as: 111 * <tableName>,<startKey>,<regionId>,<encodedRegionName>. 112 * This row is the serialized regionName of the default region replica. 113 * Columns are: 114 * info:regioninfo => contains serialized HRI for the default region replica 115 * info:server => contains hostname:port (in string form) for the server hosting 116 * the default regionInfo replica 117 * info:server_<replicaId> => contains hostname:port (in string form) for the server hosting 118 * the regionInfo replica with replicaId 119 * info:serverstartcode => contains server start code (in binary long form) for the server 120 * hosting the default regionInfo replica 121 * info:serverstartcode_<replicaId> => contains server start code (in binary long form) for 122 * the server hosting the regionInfo replica with 123 * replicaId 124 * info:seqnumDuringOpen => contains seqNum (in binary long form) for the region at the time 125 * the server opened the region with default replicaId 126 * info:seqnumDuringOpen_<replicaId> => contains seqNum (in binary long form) for the region 127 * at the time the server opened the region with 128 * replicaId 129 * info:splitA => contains a serialized HRI for the first daughter region if the 130 * region is split 131 * info:splitB => contains a serialized HRI for the second daughter region if the 132 * region is split 133 * info:merge* => contains a serialized HRI for a merge parent region. There will be two 134 * or more of these columns in a row. A row that has these columns is 135 * undergoing a merge and is the result of the merge. Columns listed 136 * in marge* columns are the parents of this merged region. Example 137 * columns: info:merge0001, info:merge0002. You make also see 'mergeA', 138 * and 'mergeB'. This is old form replaced by the new format that allows 139 * for more than two parents to be merged at a time. 140 * TODO: Add rep_barrier for serial replication explaination. See SerialReplicationChecker. 141 * </pre> 142 * </p> 143 * <p> 144 * The actual layout of meta should be encapsulated inside MetaTableAccessor methods, and should not 145 * leak out of it (through Result objects, etc) 146 * </p> 147 */ 148@InterfaceAudience.Private 149public class MetaTableAccessor { 150 151 private static final Logger LOG = LoggerFactory.getLogger(MetaTableAccessor.class); 152 private static final Logger METALOG = LoggerFactory.getLogger("org.apache.hadoop.hbase.META"); 153 154 @VisibleForTesting 155 public static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent"); 156 157 private static final byte ESCAPE_BYTE = (byte) 0xFF; 158 159 private static final byte SEPARATED_BYTE = 0x00; 160 161 @InterfaceAudience.Private 162 public enum QueryType { 163 ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY), 164 REGION(HConstants.CATALOG_FAMILY), 165 TABLE(HConstants.TABLE_FAMILY), 166 REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY); 167 168 private final byte[][] families; 169 170 QueryType(byte[]... families) { 171 this.families = families; 172 } 173 174 byte[][] getFamilies() { 175 return this.families; 176 } 177 } 178 179 /** The delimiter for meta columns for replicaIds > 0 */ 180 static final char META_REPLICA_ID_DELIMITER = '_'; 181 182 /** A regex for parsing server columns from meta. See above javadoc for meta layout */ 183 private static final Pattern SERVER_COLUMN_PATTERN 184 = Pattern.compile("^server(_[0-9a-fA-F]{4})?$"); 185 186 //////////////////////// 187 // Reading operations // 188 //////////////////////// 189 190 /** 191 * Performs a full scan of <code>hbase:meta</code> for regions. 192 * @param connection connection we're using 193 * @param visitor Visitor invoked against each row in regions family. 194 */ 195 public static void fullScanRegions(Connection connection, final Visitor visitor) 196 throws IOException { 197 scanMeta(connection, null, null, QueryType.REGION, visitor); 198 } 199 200 /** 201 * Performs a full scan of <code>hbase:meta</code> for regions. 202 * @param connection connection we're using 203 */ 204 public static List<Result> fullScanRegions(Connection connection) throws IOException { 205 return fullScan(connection, QueryType.REGION); 206 } 207 208 /** 209 * Performs a full scan of <code>hbase:meta</code> for tables. 210 * @param connection connection we're using 211 * @param visitor Visitor invoked against each row in tables family. 212 */ 213 public static void fullScanTables(Connection connection, final Visitor visitor) 214 throws IOException { 215 scanMeta(connection, null, null, QueryType.TABLE, visitor); 216 } 217 218 /** 219 * Performs a full scan of <code>hbase:meta</code>. 220 * @param connection connection we're using 221 * @param type scanned part of meta 222 * @return List of {@link Result} 223 */ 224 private static List<Result> fullScan(Connection connection, QueryType type) throws IOException { 225 CollectAllVisitor v = new CollectAllVisitor(); 226 scanMeta(connection, null, null, type, v); 227 return v.getResults(); 228 } 229 230 /** 231 * Callers should call close on the returned {@link Table} instance. 232 * @param connection connection we're using to access Meta 233 * @return An {@link Table} for <code>hbase:meta</code> 234 * @throws NullPointerException if {@code connection} is {@code null} 235 */ 236 public static Table getMetaHTable(final Connection connection) 237 throws IOException { 238 // We used to pass whole CatalogTracker in here, now we just pass in Connection 239 Objects.requireNonNull(connection, "Connection cannot be null"); 240 if (connection.isClosed()) { 241 throw new IOException("connection is closed"); 242 } 243 return connection.getTable(TableName.META_TABLE_NAME); 244 } 245 246 /** 247 * @param t Table to use (will be closed when done). 248 * @param g Get to run 249 */ 250 private static Result get(final Table t, final Get g) throws IOException { 251 if (t == null) return null; 252 try { 253 return t.get(g); 254 } finally { 255 t.close(); 256 } 257 } 258 259 /** 260 * Gets the region info and assignment for the specified region. 261 * @param connection connection we're using 262 * @param regionName Region to lookup. 263 * @return Location and RegionInfo for <code>regionName</code> 264 * @deprecated use {@link #getRegionLocation(Connection, byte[])} instead 265 */ 266 @Deprecated 267 public static Pair<RegionInfo, ServerName> getRegion(Connection connection, byte [] regionName) 268 throws IOException { 269 HRegionLocation location = getRegionLocation(connection, regionName); 270 return location == null 271 ? null 272 : new Pair<>(location.getRegion(), location.getServerName()); 273 } 274 275 /** 276 * Returns the HRegionLocation from meta for the given region 277 * @param connection connection we're using 278 * @param regionName region we're looking for 279 * @return HRegionLocation for the given region 280 */ 281 public static HRegionLocation getRegionLocation(Connection connection, byte[] regionName) 282 throws IOException { 283 byte[] row = regionName; 284 RegionInfo parsedInfo = null; 285 try { 286 parsedInfo = parseRegionInfoFromRegionName(regionName); 287 row = getMetaKeyForRegion(parsedInfo); 288 } catch (Exception parseEx) { 289 // Ignore. This is used with tableName passed as regionName. 290 } 291 Get get = new Get(row); 292 get.addFamily(HConstants.CATALOG_FAMILY); 293 Result r = get(getMetaHTable(connection), get); 294 RegionLocations locations = getRegionLocations(r); 295 return locations == null ? null 296 : locations.getRegionLocation(parsedInfo == null ? 0 : parsedInfo.getReplicaId()); 297 } 298 299 /** 300 * Returns the HRegionLocation from meta for the given region 301 * @param connection connection we're using 302 * @param regionInfo region information 303 * @return HRegionLocation for the given region 304 */ 305 public static HRegionLocation getRegionLocation(Connection connection, RegionInfo regionInfo) 306 throws IOException { 307 byte[] row = getMetaKeyForRegion(regionInfo); 308 Get get = new Get(row); 309 get.addFamily(HConstants.CATALOG_FAMILY); 310 Result r = get(getMetaHTable(connection), get); 311 return getRegionLocation(r, regionInfo, regionInfo.getReplicaId()); 312 } 313 314 /** Returns the row key to use for this regionInfo */ 315 public static byte[] getMetaKeyForRegion(RegionInfo regionInfo) { 316 return RegionReplicaUtil.getRegionInfoForDefaultReplica(regionInfo).getRegionName(); 317 } 318 319 /** Returns an HRI parsed from this regionName. Not all the fields of the HRI 320 * is stored in the name, so the returned object should only be used for the fields 321 * in the regionName. 322 */ 323 public static RegionInfo parseRegionInfoFromRegionName(byte[] regionName) throws IOException { 324 byte[][] fields = RegionInfo.parseRegionName(regionName); 325 long regionId = Long.parseLong(Bytes.toString(fields[2])); 326 int replicaId = fields.length > 3 ? Integer.parseInt(Bytes.toString(fields[3]), 16) : 0; 327 return RegionInfoBuilder.newBuilder(TableName.valueOf(fields[0])) 328 .setStartKey(fields[1]) 329 .setEndKey(fields[2]) 330 .setSplit(false) 331 .setRegionId(regionId) 332 .setReplicaId(replicaId) 333 .build(); 334 } 335 336 /** 337 * Gets the result in hbase:meta for the specified region. 338 * @param connection connection we're using 339 * @param regionName region we're looking for 340 * @return result of the specified region 341 */ 342 public static Result getRegionResult(Connection connection, 343 byte[] regionName) throws IOException { 344 Get get = new Get(regionName); 345 get.addFamily(HConstants.CATALOG_FAMILY); 346 return get(getMetaHTable(connection), get); 347 } 348 349 /** 350 * Scans META table for a row whose key contains the specified <B>regionEncodedName</B>, 351 * returning a single related <code>Result</code> instance if any row is found, null otherwise. 352 * 353 * @param connection the connection to query META table. 354 * @param regionEncodedName the region encoded name to look for at META. 355 * @return <code>Result</code> instance with the row related info in META, null otherwise. 356 * @throws IOException if any errors occur while querying META. 357 */ 358 public static Result scanByRegionEncodedName(Connection connection, 359 String regionEncodedName) throws IOException { 360 RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, 361 new SubstringComparator(regionEncodedName)); 362 Scan scan = getMetaScan(connection, 1); 363 scan.setFilter(rowFilter); 364 ResultScanner resultScanner = getMetaHTable(connection).getScanner(scan); 365 return resultScanner.next(); 366 } 367 368 /** 369 * @return Return all regioninfos listed in the 'info:merge*' columns of 370 * the <code>regionName</code> row. 371 */ 372 @Nullable 373 public static List<RegionInfo> getMergeRegions(Connection connection, byte[] regionName) 374 throws IOException { 375 return getMergeRegions(getRegionResult(connection, regionName).rawCells()); 376 } 377 378 /** 379 * @return Deserialized regioninfo values taken from column values that match 380 * the regex 'info:merge.*' in array of <code>cells</code>. 381 */ 382 @Nullable 383 public static List<RegionInfo> getMergeRegions(Cell [] cells) { 384 if (cells == null) { 385 return null; 386 } 387 List<RegionInfo> regionsToMerge = null; 388 for (Cell cell: cells) { 389 if (!isMergeQualifierPrefix(cell)) { 390 continue; 391 } 392 // Ok. This cell is that of a info:merge* column. 393 RegionInfo ri = RegionInfo.parseFromOrNull(cell.getValueArray(), cell.getValueOffset(), 394 cell.getValueLength()); 395 if (ri != null) { 396 if (regionsToMerge == null) { 397 regionsToMerge = new ArrayList<>(); 398 } 399 regionsToMerge.add(ri); 400 } 401 } 402 return regionsToMerge; 403 } 404 405 /** 406 * @return True if any merge regions present in <code>cells</code>; i.e. 407 * the column in <code>cell</code> matches the regex 'info:merge.*'. 408 */ 409 public static boolean hasMergeRegions(Cell [] cells) { 410 for (Cell cell: cells) { 411 if (!isMergeQualifierPrefix(cell)) { 412 continue; 413 } 414 return true; 415 } 416 return false; 417 } 418 419 /** 420 * @return True if the column in <code>cell</code> matches the regex 'info:merge.*'. 421 */ 422 private static boolean isMergeQualifierPrefix(Cell cell) { 423 // Check to see if has family and that qualifier starts with the merge qualifier 'merge' 424 return CellUtil.matchingFamily(cell, HConstants.CATALOG_FAMILY) && 425 PrivateCellUtil.qualifierStartsWith(cell, HConstants.MERGE_QUALIFIER_PREFIX); 426 } 427 428 /** 429 * Checks if the specified table exists. Looks at the hbase:meta table hosted on 430 * the specified server. 431 * @param connection connection we're using 432 * @param tableName table to check 433 * @return true if the table exists in meta, false if not 434 */ 435 public static boolean tableExists(Connection connection, 436 final TableName tableName) 437 throws IOException { 438 // Catalog tables always exist. 439 return tableName.equals(TableName.META_TABLE_NAME) || 440 getTableState(connection, tableName) != null; 441 } 442 443 /** 444 * Lists all of the regions currently in META. 445 * 446 * @param connection to connect with 447 * @param excludeOfflinedSplitParents False if we are to include offlined/splitparents regions, 448 * true and we'll leave out offlined regions from returned list 449 * @return List of all user-space regions. 450 */ 451 @VisibleForTesting 452 public static List<RegionInfo> getAllRegions(Connection connection, 453 boolean excludeOfflinedSplitParents) 454 throws IOException { 455 List<Pair<RegionInfo, ServerName>> result; 456 457 result = getTableRegionsAndLocations(connection, null, 458 excludeOfflinedSplitParents); 459 460 return getListOfRegionInfos(result); 461 462 } 463 464 /** 465 * Gets all of the regions of the specified table. Do not use this method 466 * to get meta table regions, use methods in MetaTableLocator instead. 467 * @param connection connection we're using 468 * @param tableName table we're looking for 469 * @return Ordered list of {@link RegionInfo}. 470 */ 471 public static List<RegionInfo> getTableRegions(Connection connection, TableName tableName) 472 throws IOException { 473 return getTableRegions(connection, tableName, false); 474 } 475 476 /** 477 * Gets all of the regions of the specified table. Do not use this method 478 * to get meta table regions, use methods in MetaTableLocator instead. 479 * @param connection connection we're using 480 * @param tableName table we're looking for 481 * @param excludeOfflinedSplitParents If true, do not include offlined split 482 * parents in the return. 483 * @return Ordered list of {@link RegionInfo}. 484 */ 485 public static List<RegionInfo> getTableRegions(Connection connection, TableName tableName, 486 final boolean excludeOfflinedSplitParents) throws IOException { 487 List<Pair<RegionInfo, ServerName>> result = 488 getTableRegionsAndLocations(connection, tableName, excludeOfflinedSplitParents); 489 return getListOfRegionInfos(result); 490 } 491 492 private static List<RegionInfo> getListOfRegionInfos( 493 final List<Pair<RegionInfo, ServerName>> pairs) { 494 if (pairs == null || pairs.isEmpty()) { 495 return Collections.emptyList(); 496 } 497 List<RegionInfo> result = new ArrayList<>(pairs.size()); 498 for (Pair<RegionInfo, ServerName> pair : pairs) { 499 result.add(pair.getFirst()); 500 } 501 return result; 502 } 503 504 /** 505 * @param tableName table we're working with 506 * @return start row for scanning META according to query type 507 */ 508 public static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) { 509 if (tableName == null) { 510 return null; 511 } 512 switch (type) { 513 case REGION: 514 byte[] startRow = new byte[tableName.getName().length + 2]; 515 System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length); 516 startRow[startRow.length - 2] = HConstants.DELIMITER; 517 startRow[startRow.length - 1] = HConstants.DELIMITER; 518 return startRow; 519 case ALL: 520 case TABLE: 521 default: 522 return tableName.getName(); 523 } 524 } 525 526 /** 527 * @param tableName table we're working with 528 * @return stop row for scanning META according to query type 529 */ 530 public static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) { 531 if (tableName == null) { 532 return null; 533 } 534 final byte[] stopRow; 535 switch (type) { 536 case REGION: 537 stopRow = new byte[tableName.getName().length + 3]; 538 System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length); 539 stopRow[stopRow.length - 3] = ' '; 540 stopRow[stopRow.length - 2] = HConstants.DELIMITER; 541 stopRow[stopRow.length - 1] = HConstants.DELIMITER; 542 break; 543 case ALL: 544 case TABLE: 545 default: 546 stopRow = new byte[tableName.getName().length + 1]; 547 System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length); 548 stopRow[stopRow.length - 1] = ' '; 549 break; 550 } 551 return stopRow; 552 } 553 554 /** 555 * This method creates a Scan object that will only scan catalog rows that 556 * belong to the specified table. It doesn't specify any columns. 557 * This is a better alternative to just using a start row and scan until 558 * it hits a new table since that requires parsing the HRI to get the table 559 * name. 560 * @param tableName bytes of table's name 561 * @return configured Scan object 562 */ 563 @Deprecated 564 public static Scan getScanForTableName(Connection connection, TableName tableName) { 565 // Start key is just the table name with delimiters 566 byte[] startKey = getTableStartRowForMeta(tableName, QueryType.REGION); 567 // Stop key appends the smallest possible char to the table name 568 byte[] stopKey = getTableStopRowForMeta(tableName, QueryType.REGION); 569 570 Scan scan = getMetaScan(connection, -1); 571 scan.setStartRow(startKey); 572 scan.setStopRow(stopKey); 573 return scan; 574 } 575 576 private static Scan getMetaScan(Connection connection, int rowUpperLimit) { 577 Scan scan = new Scan(); 578 int scannerCaching = connection.getConfiguration() 579 .getInt(HConstants.HBASE_META_SCANNER_CACHING, 580 HConstants.DEFAULT_HBASE_META_SCANNER_CACHING); 581 if (connection.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS, 582 HConstants.DEFAULT_USE_META_REPLICAS)) { 583 scan.setConsistency(Consistency.TIMELINE); 584 } 585 if (rowUpperLimit > 0) { 586 scan.setLimit(rowUpperLimit); 587 scan.setReadType(Scan.ReadType.PREAD); 588 } 589 scan.setCaching(scannerCaching); 590 return scan; 591 } 592 /** 593 * Do not use this method to get meta table regions, use methods in MetaTableLocator instead. 594 * @param connection connection we're using 595 * @param tableName table we're looking for 596 * @return Return list of regioninfos and server. 597 */ 598 public static List<Pair<RegionInfo, ServerName>> 599 getTableRegionsAndLocations(Connection connection, TableName tableName) 600 throws IOException { 601 return getTableRegionsAndLocations(connection, tableName, true); 602 } 603 604 /** 605 * Do not use this method to get meta table regions, use methods in MetaTableLocator instead. 606 * @param connection connection we're using 607 * @param tableName table to work with, can be null for getting all regions 608 * @param excludeOfflinedSplitParents don't return split parents 609 * @return Return list of regioninfos and server addresses. 610 */ 611 // What happens here when 1M regions in hbase:meta? This won't scale? 612 public static List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations( 613 Connection connection, @Nullable final TableName tableName, 614 final boolean excludeOfflinedSplitParents) throws IOException { 615 if (tableName != null && tableName.equals(TableName.META_TABLE_NAME)) { 616 throw new IOException("This method can't be used to locate meta regions;" 617 + " use MetaTableLocator instead"); 618 } 619 // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress 620 CollectingVisitor<Pair<RegionInfo, ServerName>> visitor = 621 new CollectingVisitor<Pair<RegionInfo, ServerName>>() { 622 private RegionLocations current = null; 623 624 @Override 625 public boolean visit(Result r) throws IOException { 626 current = getRegionLocations(r); 627 if (current == null || current.getRegionLocation().getRegion() == null) { 628 LOG.warn("No serialized RegionInfo in " + r); 629 return true; 630 } 631 RegionInfo hri = current.getRegionLocation().getRegion(); 632 if (excludeOfflinedSplitParents && hri.isSplitParent()) return true; 633 // Else call super and add this Result to the collection. 634 return super.visit(r); 635 } 636 637 @Override 638 void add(Result r) { 639 if (current == null) { 640 return; 641 } 642 for (HRegionLocation loc : current.getRegionLocations()) { 643 if (loc != null) { 644 this.results.add(new Pair<>(loc.getRegion(), loc.getServerName())); 645 } 646 } 647 } 648 }; 649 scanMeta(connection, 650 getTableStartRowForMeta(tableName, QueryType.REGION), 651 getTableStopRowForMeta(tableName, QueryType.REGION), 652 QueryType.REGION, visitor); 653 return visitor.getResults(); 654 } 655 656 public static void fullScanMetaAndPrint(Connection connection) 657 throws IOException { 658 Visitor v = r -> { 659 if (r == null || r.isEmpty()) { 660 return true; 661 } 662 LOG.info("fullScanMetaAndPrint.Current Meta Row: " + r); 663 TableState state = getTableState(r); 664 if (state != null) { 665 LOG.info("fullScanMetaAndPrint.Table State={}" + state); 666 } else { 667 RegionLocations locations = getRegionLocations(r); 668 if (locations == null) { 669 return true; 670 } 671 for (HRegionLocation loc : locations.getRegionLocations()) { 672 if (loc != null) { 673 LOG.info("fullScanMetaAndPrint.HRI Print={}", loc.getRegion()); 674 } 675 } 676 } 677 return true; 678 }; 679 scanMeta(connection, null, null, QueryType.ALL, v); 680 } 681 682 public static void scanMetaForTableRegions(Connection connection, Visitor visitor, 683 TableName tableName) throws IOException { 684 scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor); 685 } 686 687 private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows, 688 final Visitor visitor) throws IOException { 689 scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type), 690 type, maxRows, visitor); 691 } 692 693 private static void scanMeta(Connection connection, @Nullable final byte[] startRow, 694 @Nullable final byte[] stopRow, QueryType type, final Visitor visitor) throws IOException { 695 scanMeta(connection, startRow, stopRow, type, Integer.MAX_VALUE, visitor); 696 } 697 698 /** 699 * Performs a scan of META table for given table starting from given row. 700 * @param connection connection we're using 701 * @param visitor visitor to call 702 * @param tableName table withing we scan 703 * @param row start scan from this row 704 * @param rowLimit max number of rows to return 705 */ 706 public static void scanMeta(Connection connection, final Visitor visitor, 707 final TableName tableName, final byte[] row, final int rowLimit) throws IOException { 708 byte[] startRow = null; 709 byte[] stopRow = null; 710 if (tableName != null) { 711 startRow = getTableStartRowForMeta(tableName, QueryType.REGION); 712 if (row != null) { 713 RegionInfo closestRi = getClosestRegionInfo(connection, tableName, row); 714 startRow = 715 RegionInfo.createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false); 716 } 717 stopRow = getTableStopRowForMeta(tableName, QueryType.REGION); 718 } 719 scanMeta(connection, startRow, stopRow, QueryType.REGION, rowLimit, visitor); 720 } 721 722 /** 723 * Performs a scan of META table. 724 * @param connection connection we're using 725 * @param startRow Where to start the scan. Pass null if want to begin scan 726 * at first row. 727 * @param stopRow Where to stop the scan. Pass null if want to scan all rows 728 * from the start one 729 * @param type scanned part of meta 730 * @param maxRows maximum rows to return 731 * @param visitor Visitor invoked against each row. 732 */ 733 static void scanMeta(Connection connection, @Nullable final byte[] startRow, 734 @Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) 735 throws IOException { 736 scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor); 737 } 738 739 private static void scanMeta(Connection connection, @Nullable final byte[] startRow, 740 @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows, 741 final Visitor visitor) throws IOException { 742 int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; 743 Scan scan = getMetaScan(connection, rowUpperLimit); 744 745 for (byte[] family : type.getFamilies()) { 746 scan.addFamily(family); 747 } 748 if (startRow != null) { 749 scan.withStartRow(startRow); 750 } 751 if (stopRow != null) { 752 scan.withStopRow(stopRow); 753 } 754 if (filter != null) { 755 scan.setFilter(filter); 756 } 757 758 if (LOG.isTraceEnabled()) { 759 LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow) + 760 " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit + 761 " with caching=" + scan.getCaching()); 762 } 763 764 int currentRow = 0; 765 try (Table metaTable = getMetaHTable(connection)) { 766 try (ResultScanner scanner = metaTable.getScanner(scan)) { 767 Result data; 768 while ((data = scanner.next()) != null) { 769 if (data.isEmpty()) continue; 770 // Break if visit returns false. 771 if (!visitor.visit(data)) break; 772 if (++currentRow >= rowUpperLimit) break; 773 } 774 } 775 } 776 if (visitor instanceof Closeable) { 777 try { 778 ((Closeable) visitor).close(); 779 } catch (Throwable t) { 780 ExceptionUtil.rethrowIfInterrupt(t); 781 LOG.debug("Got exception in closing the meta scanner visitor", t); 782 } 783 } 784 } 785 786 /** 787 * @return Get closest metatable region row to passed <code>row</code> 788 */ 789 @NonNull 790 private static RegionInfo getClosestRegionInfo(Connection connection, 791 @NonNull final TableName tableName, @NonNull final byte[] row) throws IOException { 792 byte[] searchRow = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 793 Scan scan = getMetaScan(connection, 1); 794 scan.setReversed(true); 795 scan.withStartRow(searchRow); 796 try (ResultScanner resultScanner = getMetaHTable(connection).getScanner(scan)) { 797 Result result = resultScanner.next(); 798 if (result == null) { 799 throw new TableNotFoundException("Cannot find row in META " + 800 " for table: " + tableName + ", row=" + Bytes.toStringBinary(row)); 801 } 802 RegionInfo regionInfo = getRegionInfo(result); 803 if (regionInfo == null) { 804 throw new IOException("RegionInfo was null or empty in Meta for " + 805 tableName + ", row=" + Bytes.toStringBinary(row)); 806 } 807 return regionInfo; 808 } 809 } 810 811 /** 812 * Returns the column family used for meta columns. 813 * @return HConstants.CATALOG_FAMILY. 814 */ 815 public static byte[] getCatalogFamily() { 816 return HConstants.CATALOG_FAMILY; 817 } 818 819 /** 820 * Returns the column family used for table columns. 821 * @return HConstants.TABLE_FAMILY. 822 */ 823 private static byte[] getTableFamily() { 824 return HConstants.TABLE_FAMILY; 825 } 826 827 /** 828 * Returns the column qualifier for serialized region info 829 * @return HConstants.REGIONINFO_QUALIFIER 830 */ 831 public static byte[] getRegionInfoColumn() { 832 return HConstants.REGIONINFO_QUALIFIER; 833 } 834 835 /** 836 * Returns the column qualifier for serialized table state 837 * @return HConstants.TABLE_STATE_QUALIFIER 838 */ 839 private static byte[] getTableStateColumn() { 840 return HConstants.TABLE_STATE_QUALIFIER; 841 } 842 843 /** 844 * Returns the column qualifier for serialized region state 845 * @return HConstants.STATE_QUALIFIER 846 */ 847 private static byte[] getRegionStateColumn() { 848 return HConstants.STATE_QUALIFIER; 849 } 850 851 /** 852 * Returns the column qualifier for serialized region state 853 * @param replicaId the replicaId of the region 854 * @return a byte[] for state qualifier 855 */ 856 @VisibleForTesting 857 static byte[] getRegionStateColumn(int replicaId) { 858 return replicaId == 0 ? HConstants.STATE_QUALIFIER 859 : Bytes.toBytes(HConstants.STATE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER 860 + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId)); 861 } 862 863 /** 864 * Returns the column qualifier for serialized region state 865 * @param replicaId the replicaId of the region 866 * @return a byte[] for sn column qualifier 867 */ 868 @VisibleForTesting 869 static byte[] getServerNameColumn(int replicaId) { 870 return replicaId == 0 ? HConstants.SERVERNAME_QUALIFIER 871 : Bytes.toBytes(HConstants.SERVERNAME_QUALIFIER_STR + META_REPLICA_ID_DELIMITER 872 + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId)); 873 } 874 875 /** 876 * Returns the column qualifier for server column for replicaId 877 * @param replicaId the replicaId of the region 878 * @return a byte[] for server column qualifier 879 */ 880 @VisibleForTesting 881 public static byte[] getServerColumn(int replicaId) { 882 return replicaId == 0 883 ? HConstants.SERVER_QUALIFIER 884 : Bytes.toBytes(HConstants.SERVER_QUALIFIER_STR + META_REPLICA_ID_DELIMITER 885 + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId)); 886 } 887 888 /** 889 * Returns the column qualifier for server start code column for replicaId 890 * @param replicaId the replicaId of the region 891 * @return a byte[] for server start code column qualifier 892 */ 893 @VisibleForTesting 894 public static byte[] getStartCodeColumn(int replicaId) { 895 return replicaId == 0 896 ? HConstants.STARTCODE_QUALIFIER 897 : Bytes.toBytes(HConstants.STARTCODE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER 898 + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId)); 899 } 900 901 /** 902 * Returns the column qualifier for seqNum column for replicaId 903 * @param replicaId the replicaId of the region 904 * @return a byte[] for seqNum column qualifier 905 */ 906 @VisibleForTesting 907 public static byte[] getSeqNumColumn(int replicaId) { 908 return replicaId == 0 909 ? HConstants.SEQNUM_QUALIFIER 910 : Bytes.toBytes(HConstants.SEQNUM_QUALIFIER_STR + META_REPLICA_ID_DELIMITER 911 + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId)); 912 } 913 914 /** 915 * Parses the replicaId from the server column qualifier. See top of the class javadoc 916 * for the actual meta layout 917 * @param serverColumn the column qualifier 918 * @return an int for the replicaId 919 */ 920 @VisibleForTesting 921 static int parseReplicaIdFromServerColumn(byte[] serverColumn) { 922 String serverStr = Bytes.toString(serverColumn); 923 924 Matcher matcher = SERVER_COLUMN_PATTERN.matcher(serverStr); 925 if (matcher.matches() && matcher.groupCount() > 0) { 926 String group = matcher.group(1); 927 if (group != null && group.length() > 0) { 928 return Integer.parseInt(group.substring(1), 16); 929 } else { 930 return 0; 931 } 932 } 933 return -1; 934 } 935 936 /** 937 * Returns a {@link ServerName} from catalog table {@link Result}. 938 * @param r Result to pull from 939 * @return A ServerName instance or null if necessary fields not found or empty. 940 */ 941 @Nullable 942 @InterfaceAudience.Private // for use by HMaster#getTableRegionRow which is used for testing only 943 public static ServerName getServerName(final Result r, final int replicaId) { 944 byte[] serverColumn = getServerColumn(replicaId); 945 Cell cell = r.getColumnLatestCell(getCatalogFamily(), serverColumn); 946 if (cell == null || cell.getValueLength() == 0) return null; 947 String hostAndPort = Bytes.toString( 948 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 949 byte[] startcodeColumn = getStartCodeColumn(replicaId); 950 cell = r.getColumnLatestCell(getCatalogFamily(), startcodeColumn); 951 if (cell == null || cell.getValueLength() == 0) return null; 952 try { 953 return ServerName.valueOf(hostAndPort, 954 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 955 } catch (IllegalArgumentException e) { 956 LOG.error("Ignoring invalid region for server " + hostAndPort + "; cell=" + cell, e); 957 return null; 958 } 959 } 960 961 /** 962 * The latest seqnum that the server writing to meta observed when opening the region. 963 * E.g. the seqNum when the result of {@link #getServerName(Result, int)} was written. 964 * @param r Result to pull the seqNum from 965 * @return SeqNum, or HConstants.NO_SEQNUM if there's no value written. 966 */ 967 private static long getSeqNumDuringOpen(final Result r, final int replicaId) { 968 Cell cell = r.getColumnLatestCell(getCatalogFamily(), getSeqNumColumn(replicaId)); 969 if (cell == null || cell.getValueLength() == 0) return HConstants.NO_SEQNUM; 970 return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 971 } 972 973 /** 974 * Returns the daughter regions by reading the corresponding columns of the catalog table 975 * Result. 976 * @param data a Result object from the catalog table scan 977 * @return pair of RegionInfo or PairOfSameType(null, null) if region is not a split parent 978 */ 979 public static PairOfSameType<RegionInfo> getDaughterRegions(Result data) { 980 RegionInfo splitA = getRegionInfo(data, HConstants.SPLITA_QUALIFIER); 981 RegionInfo splitB = getRegionInfo(data, HConstants.SPLITB_QUALIFIER); 982 return new PairOfSameType<>(splitA, splitB); 983 } 984 985 /** 986 * Returns an HRegionLocationList extracted from the result. 987 * @return an HRegionLocationList containing all locations for the region range or null if 988 * we can't deserialize the result. 989 */ 990 @Nullable 991 public static RegionLocations getRegionLocations(final Result r) { 992 if (r == null) return null; 993 RegionInfo regionInfo = getRegionInfo(r, getRegionInfoColumn()); 994 if (regionInfo == null) return null; 995 996 List<HRegionLocation> locations = new ArrayList<>(1); 997 NavigableMap<byte[],NavigableMap<byte[],byte[]>> familyMap = r.getNoVersionMap(); 998 999 locations.add(getRegionLocation(r, regionInfo, 0)); 1000 1001 NavigableMap<byte[], byte[]> infoMap = familyMap.get(getCatalogFamily()); 1002 if (infoMap == null) return new RegionLocations(locations); 1003 1004 // iterate until all serverName columns are seen 1005 int replicaId = 0; 1006 byte[] serverColumn = getServerColumn(replicaId); 1007 SortedMap<byte[], byte[]> serverMap; 1008 serverMap = infoMap.tailMap(serverColumn, false); 1009 1010 if (serverMap.isEmpty()) return new RegionLocations(locations); 1011 1012 for (Map.Entry<byte[], byte[]> entry : serverMap.entrySet()) { 1013 replicaId = parseReplicaIdFromServerColumn(entry.getKey()); 1014 if (replicaId < 0) { 1015 break; 1016 } 1017 HRegionLocation location = getRegionLocation(r, regionInfo, replicaId); 1018 // In case the region replica is newly created, it's location might be null. We usually do not 1019 // have HRL's in RegionLocations object with null ServerName. They are handled as null HRLs. 1020 if (location.getServerName() == null) { 1021 locations.add(null); 1022 } else { 1023 locations.add(location); 1024 } 1025 } 1026 1027 return new RegionLocations(locations); 1028 } 1029 1030 /** 1031 * Returns the HRegionLocation parsed from the given meta row Result 1032 * for the given regionInfo and replicaId. The regionInfo can be the default region info 1033 * for the replica. 1034 * @param r the meta row result 1035 * @param regionInfo RegionInfo for default replica 1036 * @param replicaId the replicaId for the HRegionLocation 1037 * @return HRegionLocation parsed from the given meta row Result for the given replicaId 1038 */ 1039 private static HRegionLocation getRegionLocation(final Result r, final RegionInfo regionInfo, 1040 final int replicaId) { 1041 ServerName serverName = getServerName(r, replicaId); 1042 long seqNum = getSeqNumDuringOpen(r, replicaId); 1043 RegionInfo replicaInfo = RegionReplicaUtil.getRegionInfoForReplica(regionInfo, replicaId); 1044 return new HRegionLocation(replicaInfo, serverName, seqNum); 1045 } 1046 1047 /** 1048 * Returns RegionInfo object from the column 1049 * HConstants.CATALOG_FAMILY:HConstants.REGIONINFO_QUALIFIER of the catalog 1050 * table Result. 1051 * @param data a Result object from the catalog table scan 1052 * @return RegionInfo or null 1053 */ 1054 public static RegionInfo getRegionInfo(Result data) { 1055 return getRegionInfo(data, HConstants.REGIONINFO_QUALIFIER); 1056 } 1057 1058 /** 1059 * Returns the RegionInfo object from the column {@link HConstants#CATALOG_FAMILY} and 1060 * <code>qualifier</code> of the catalog table result. 1061 * @param r a Result object from the catalog table scan 1062 * @param qualifier Column family qualifier 1063 * @return An RegionInfo instance or null. 1064 */ 1065 @Nullable 1066 public static RegionInfo getRegionInfo(final Result r, byte [] qualifier) { 1067 Cell cell = r.getColumnLatestCell(getCatalogFamily(), qualifier); 1068 if (cell == null) return null; 1069 return RegionInfo.parseFromOrNull(cell.getValueArray(), 1070 cell.getValueOffset(), cell.getValueLength()); 1071 } 1072 1073 /** 1074 * Fetch table state for given table from META table 1075 * @param conn connection to use 1076 * @param tableName table to fetch state for 1077 */ 1078 @Nullable 1079 public static TableState getTableState(Connection conn, TableName tableName) 1080 throws IOException { 1081 if (tableName.equals(TableName.META_TABLE_NAME)) { 1082 return new TableState(tableName, TableState.State.ENABLED); 1083 } 1084 Table metaHTable = getMetaHTable(conn); 1085 Get get = new Get(tableName.getName()).addColumn(getTableFamily(), getTableStateColumn()); 1086 Result result = metaHTable.get(get); 1087 return getTableState(result); 1088 } 1089 1090 /** 1091 * Fetch table states from META table 1092 * @param conn connection to use 1093 * @return map {tableName -> state} 1094 */ 1095 public static Map<TableName, TableState> getTableStates(Connection conn) 1096 throws IOException { 1097 final Map<TableName, TableState> states = new LinkedHashMap<>(); 1098 Visitor collector = r -> { 1099 TableState state = getTableState(r); 1100 if (state != null) { 1101 states.put(state.getTableName(), state); 1102 } 1103 return true; 1104 }; 1105 fullScanTables(conn, collector); 1106 return states; 1107 } 1108 1109 /** 1110 * Updates state in META 1111 * @param conn connection to use 1112 * @param tableName table to look for 1113 */ 1114 public static void updateTableState(Connection conn, TableName tableName, 1115 TableState.State actual) throws IOException { 1116 updateTableState(conn, new TableState(tableName, actual)); 1117 } 1118 1119 /** 1120 * Decode table state from META Result. 1121 * Should contain cell from HConstants.TABLE_FAMILY 1122 * @return null if not found 1123 */ 1124 @Nullable 1125 public static TableState getTableState(Result r) throws IOException { 1126 Cell cell = r.getColumnLatestCell(getTableFamily(), getTableStateColumn()); 1127 if (cell == null) { 1128 return null; 1129 } 1130 try { 1131 return TableState.parseFrom(TableName.valueOf(r.getRow()), 1132 Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), 1133 cell.getValueOffset() + cell.getValueLength())); 1134 } catch (DeserializationException e) { 1135 throw new IOException(e); 1136 } 1137 } 1138 1139 /** 1140 * Implementations 'visit' a catalog table row. 1141 */ 1142 public interface Visitor { 1143 /** 1144 * Visit the catalog table row. 1145 * @param r A row from catalog table 1146 * @return True if we are to proceed scanning the table, else false if 1147 * we are to stop now. 1148 */ 1149 boolean visit(final Result r) throws IOException; 1150 } 1151 1152 /** 1153 * Implementations 'visit' a catalog table row but with close() at the end. 1154 */ 1155 public interface CloseableVisitor extends Visitor, Closeable { 1156 } 1157 1158 /** 1159 * A {@link Visitor} that collects content out of passed {@link Result}. 1160 */ 1161 static abstract class CollectingVisitor<T> implements Visitor { 1162 final List<T> results = new ArrayList<>(); 1163 @Override 1164 public boolean visit(Result r) throws IOException { 1165 if (r != null && !r.isEmpty()) { 1166 add(r); 1167 } 1168 return true; 1169 } 1170 1171 abstract void add(Result r); 1172 1173 /** 1174 * @return Collected results; wait till visits complete to collect all 1175 * possible results 1176 */ 1177 List<T> getResults() { 1178 return this.results; 1179 } 1180 } 1181 1182 /** 1183 * Collects all returned. 1184 */ 1185 static class CollectAllVisitor extends CollectingVisitor<Result> { 1186 @Override 1187 void add(Result r) { 1188 this.results.add(r); 1189 } 1190 } 1191 1192 /** 1193 * A Visitor that skips offline regions and split parents 1194 */ 1195 public static abstract class DefaultVisitorBase implements Visitor { 1196 1197 DefaultVisitorBase() { 1198 super(); 1199 } 1200 1201 public abstract boolean visitInternal(Result rowResult) throws IOException; 1202 1203 @Override 1204 public boolean visit(Result rowResult) throws IOException { 1205 RegionInfo info = getRegionInfo(rowResult); 1206 if (info == null) { 1207 return true; 1208 } 1209 1210 //skip over offline and split regions 1211 if (!(info.isOffline() || info.isSplit())) { 1212 return visitInternal(rowResult); 1213 } 1214 return true; 1215 } 1216 } 1217 1218 /** 1219 * Count regions in <code>hbase:meta</code> for passed table. 1220 * @param c Configuration object 1221 * @param tableName table name to count regions for 1222 * @return Count or regions in table <code>tableName</code> 1223 */ 1224 public static int getRegionCount(final Configuration c, final TableName tableName) 1225 throws IOException { 1226 try (Connection connection = ConnectionFactory.createConnection(c)) { 1227 return getRegionCount(connection, tableName); 1228 } 1229 } 1230 1231 /** 1232 * Count regions in <code>hbase:meta</code> for passed table. 1233 * @param connection Connection object 1234 * @param tableName table name to count regions for 1235 * @return Count or regions in table <code>tableName</code> 1236 */ 1237 public static int getRegionCount(final Connection connection, final TableName tableName) 1238 throws IOException { 1239 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 1240 List<HRegionLocation> locations = locator.getAllRegionLocations(); 1241 return locations == null ? 0 : locations.size(); 1242 } 1243 } 1244 1245 //////////////////////// 1246 // Editing operations // 1247 //////////////////////// 1248 /** 1249 * Generates and returns a Put containing the region into for the catalog table 1250 */ 1251 public static Put makePutFromRegionInfo(RegionInfo regionInfo, long ts) throws IOException { 1252 Put put = new Put(regionInfo.getRegionName(), ts); 1253 addRegionInfo(put, regionInfo); 1254 return put; 1255 } 1256 1257 /** 1258 * Generates and returns a Delete containing the region info for the catalog table 1259 */ 1260 private static Delete makeDeleteFromRegionInfo(RegionInfo regionInfo, long ts) { 1261 if (regionInfo == null) { 1262 throw new IllegalArgumentException("Can't make a delete for null region"); 1263 } 1264 Delete delete = new Delete(regionInfo.getRegionName()); 1265 delete.addFamily(getCatalogFamily(), ts); 1266 return delete; 1267 } 1268 1269 /** 1270 * Adds split daughters to the Put 1271 */ 1272 private static Put addDaughtersToPut(Put put, RegionInfo splitA, RegionInfo splitB) 1273 throws IOException { 1274 if (splitA != null) { 1275 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 1276 .setRow(put.getRow()) 1277 .setFamily(HConstants.CATALOG_FAMILY) 1278 .setQualifier(HConstants.SPLITA_QUALIFIER) 1279 .setTimestamp(put.getTimestamp()) 1280 .setType(Type.Put) 1281 .setValue(RegionInfo.toByteArray(splitA)) 1282 .build()); 1283 } 1284 if (splitB != null) { 1285 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 1286 .setRow(put.getRow()) 1287 .setFamily(HConstants.CATALOG_FAMILY) 1288 .setQualifier(HConstants.SPLITB_QUALIFIER) 1289 .setTimestamp(put.getTimestamp()) 1290 .setType(Type.Put) 1291 .setValue(RegionInfo.toByteArray(splitB)) 1292 .build()); 1293 } 1294 return put; 1295 } 1296 1297 /** 1298 * Put the passed <code>p</code> to the <code>hbase:meta</code> table. 1299 * @param connection connection we're using 1300 * @param p Put to add to hbase:meta 1301 */ 1302 private static void putToMetaTable(Connection connection, Put p) throws IOException { 1303 try (Table table = getMetaHTable(connection)) { 1304 put(table, p); 1305 } 1306 } 1307 1308 /** 1309 * @param t Table to use 1310 * @param p put to make 1311 */ 1312 private static void put(Table t, Put p) throws IOException { 1313 debugLogMutation(p); 1314 t.put(p); 1315 } 1316 1317 /** 1318 * Put the passed <code>ps</code> to the <code>hbase:meta</code> table. 1319 * @param connection connection we're using 1320 * @param ps Put to add to hbase:meta 1321 */ 1322 public static void putsToMetaTable(final Connection connection, final List<Put> ps) 1323 throws IOException { 1324 if (ps.isEmpty()) { 1325 return; 1326 } 1327 try (Table t = getMetaHTable(connection)) { 1328 debugLogMutations(ps); 1329 // the implementation for putting a single Put is much simpler so here we do a check first. 1330 if (ps.size() == 1) { 1331 t.put(ps.get(0)); 1332 } else { 1333 t.put(ps); 1334 } 1335 } 1336 } 1337 1338 /** 1339 * Delete the passed <code>d</code> from the <code>hbase:meta</code> table. 1340 * @param connection connection we're using 1341 * @param d Delete to add to hbase:meta 1342 */ 1343 private static void deleteFromMetaTable(final Connection connection, final Delete d) 1344 throws IOException { 1345 List<Delete> dels = new ArrayList<>(1); 1346 dels.add(d); 1347 deleteFromMetaTable(connection, dels); 1348 } 1349 1350 /** 1351 * Delete the passed <code>deletes</code> from the <code>hbase:meta</code> table. 1352 * @param connection connection we're using 1353 * @param deletes Deletes to add to hbase:meta This list should support #remove. 1354 */ 1355 private static void deleteFromMetaTable(final Connection connection, final List<Delete> deletes) 1356 throws IOException { 1357 try (Table t = getMetaHTable(connection)) { 1358 debugLogMutations(deletes); 1359 t.delete(deletes); 1360 } 1361 } 1362 1363 /** 1364 * Deletes some replica columns corresponding to replicas for the passed rows 1365 * @param metaRows rows in hbase:meta 1366 * @param replicaIndexToDeleteFrom the replica ID we would start deleting from 1367 * @param numReplicasToRemove how many replicas to remove 1368 * @param connection connection we're using to access meta table 1369 */ 1370 public static void removeRegionReplicasFromMeta(Set<byte[]> metaRows, 1371 int replicaIndexToDeleteFrom, int numReplicasToRemove, Connection connection) 1372 throws IOException { 1373 int absoluteIndex = replicaIndexToDeleteFrom + numReplicasToRemove; 1374 for (byte[] row : metaRows) { 1375 long now = EnvironmentEdgeManager.currentTime(); 1376 Delete deleteReplicaLocations = new Delete(row); 1377 for (int i = replicaIndexToDeleteFrom; i < absoluteIndex; i++) { 1378 deleteReplicaLocations.addColumns(getCatalogFamily(), 1379 getServerColumn(i), now); 1380 deleteReplicaLocations.addColumns(getCatalogFamily(), 1381 getSeqNumColumn(i), now); 1382 deleteReplicaLocations.addColumns(getCatalogFamily(), 1383 getStartCodeColumn(i), now); 1384 deleteReplicaLocations.addColumns(getCatalogFamily(), getServerNameColumn(i), now); 1385 deleteReplicaLocations.addColumns(getCatalogFamily(), getRegionStateColumn(i), now); 1386 } 1387 1388 deleteFromMetaTable(connection, deleteReplicaLocations); 1389 } 1390 } 1391 1392 private static void addRegionStateToPut(Put put, RegionState.State state) throws IOException { 1393 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 1394 .setRow(put.getRow()) 1395 .setFamily(HConstants.CATALOG_FAMILY) 1396 .setQualifier(getRegionStateColumn()) 1397 .setTimestamp(put.getTimestamp()) 1398 .setType(Cell.Type.Put) 1399 .setValue(Bytes.toBytes(state.name())) 1400 .build()); 1401 } 1402 1403 /** 1404 * Adds daughter region infos to hbase:meta row for the specified region. Note that this does not 1405 * add its daughter's as different rows, but adds information about the daughters in the same row 1406 * as the parent. Use 1407 * {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, ServerName, int)} 1408 * if you want to do that. 1409 * @param connection connection we're using 1410 * @param regionInfo RegionInfo of parent region 1411 * @param splitA first split daughter of the parent regionInfo 1412 * @param splitB second split daughter of the parent regionInfo 1413 * @throws IOException if problem connecting or updating meta 1414 */ 1415 public static void addSplitsToParent(Connection connection, RegionInfo regionInfo, 1416 RegionInfo splitA, RegionInfo splitB) throws IOException { 1417 try (Table meta = getMetaHTable(connection)) { 1418 Put put = makePutFromRegionInfo(regionInfo, EnvironmentEdgeManager.currentTime()); 1419 addDaughtersToPut(put, splitA, splitB); 1420 meta.put(put); 1421 debugLogMutation(put); 1422 LOG.debug("Added region {}", regionInfo.getRegionNameAsString()); 1423 } 1424 } 1425 1426 /** 1427 * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this 1428 * does not add its daughter's as different rows, but adds information about the daughters 1429 * in the same row as the parent. Use 1430 * {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, ServerName, int)} 1431 * if you want to do that. 1432 * @param connection connection we're using 1433 * @param regionInfo region information 1434 * @throws IOException if problem connecting or updating meta 1435 */ 1436 @VisibleForTesting 1437 public static void addRegionToMeta(Connection connection, RegionInfo regionInfo) 1438 throws IOException { 1439 addRegionsToMeta(connection, Collections.singletonList(regionInfo), 1); 1440 } 1441 1442 /** 1443 * Adds a hbase:meta row for each of the specified new regions. Initial state for new regions 1444 * is CLOSED. 1445 * @param connection connection we're using 1446 * @param regionInfos region information list 1447 * @throws IOException if problem connecting or updating meta 1448 */ 1449 public static void addRegionsToMeta(Connection connection, List<RegionInfo> regionInfos, 1450 int regionReplication) throws IOException { 1451 addRegionsToMeta(connection, regionInfos, regionReplication, 1452 EnvironmentEdgeManager.currentTime()); 1453 } 1454 1455 /** 1456 * Adds a hbase:meta row for each of the specified new regions. Initial state for new regions 1457 * is CLOSED. 1458 * @param connection connection we're using 1459 * @param regionInfos region information list 1460 * @param ts desired timestamp 1461 * @throws IOException if problem connecting or updating meta 1462 */ 1463 private static void addRegionsToMeta(Connection connection, List<RegionInfo> regionInfos, 1464 int regionReplication, long ts) throws IOException { 1465 List<Put> puts = new ArrayList<>(); 1466 for (RegionInfo regionInfo : regionInfos) { 1467 if (RegionReplicaUtil.isDefaultReplica(regionInfo)) { 1468 Put put = makePutFromRegionInfo(regionInfo, ts); 1469 // New regions are added with initial state of CLOSED. 1470 addRegionStateToPut(put, RegionState.State.CLOSED); 1471 // Add empty locations for region replicas so that number of replicas can be cached 1472 // whenever the primary region is looked up from meta 1473 for (int i = 1; i < regionReplication; i++) { 1474 addEmptyLocation(put, i); 1475 } 1476 puts.add(put); 1477 } 1478 } 1479 putsToMetaTable(connection, puts); 1480 LOG.info("Added {} regions to meta.", puts.size()); 1481 } 1482 1483 static Put addMergeRegions(Put put, Collection<RegionInfo> mergeRegions) throws IOException { 1484 int limit = 10000; // Arbitrary limit. No room in our formatted 'task0000' below for more. 1485 int max = mergeRegions.size(); 1486 if (max > limit) { 1487 // Should never happen!!!!! But just in case. 1488 throw new RuntimeException("Can't merge " + max + " regions in one go; " + limit + 1489 " is upper-limit."); 1490 } 1491 int counter = 0; 1492 for (RegionInfo ri: mergeRegions) { 1493 String qualifier = String.format(HConstants.MERGE_QUALIFIER_PREFIX_STR + "%04d", counter++); 1494 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY). 1495 setRow(put.getRow()). 1496 setFamily(HConstants.CATALOG_FAMILY). 1497 setQualifier(Bytes.toBytes(qualifier)). 1498 setTimestamp(put.getTimestamp()). 1499 setType(Type.Put). 1500 setValue(RegionInfo.toByteArray(ri)). 1501 build()); 1502 } 1503 return put; 1504 } 1505 1506 /** 1507 * Merge regions into one in an atomic operation. Deletes the merging regions in 1508 * hbase:meta and adds the merged region. 1509 * @param connection connection we're using 1510 * @param mergedRegion the merged region 1511 * @param parentSeqNum Parent regions to merge and their next open sequence id used 1512 * by serial replication. Set to -1 if not needed by this table. 1513 * @param sn the location of the region 1514 */ 1515 public static void mergeRegions(Connection connection, RegionInfo mergedRegion, 1516 Map<RegionInfo, Long> parentSeqNum, ServerName sn, int regionReplication) 1517 throws IOException { 1518 try (Table meta = getMetaHTable(connection)) { 1519 long time = HConstants.LATEST_TIMESTAMP; 1520 List<Mutation> mutations = new ArrayList<>(); 1521 List<RegionInfo> replicationParents = new ArrayList<>(); 1522 for (Map.Entry<RegionInfo, Long> e: parentSeqNum.entrySet()) { 1523 RegionInfo ri = e.getKey(); 1524 long seqNum = e.getValue(); 1525 // Deletes for merging regions 1526 mutations.add(makeDeleteFromRegionInfo(ri, time)); 1527 if (seqNum > 0) { 1528 mutations.add(makePutForReplicationBarrier(ri, seqNum, time)); 1529 replicationParents.add(ri); 1530 } 1531 } 1532 // Put for parent 1533 Put putOfMerged = makePutFromRegionInfo(mergedRegion, time); 1534 putOfMerged = addMergeRegions(putOfMerged, parentSeqNum.keySet()); 1535 // Set initial state to CLOSED. 1536 // NOTE: If initial state is not set to CLOSED then merged region gets added with the 1537 // default OFFLINE state. If Master gets restarted after this step, start up sequence of 1538 // master tries to assign this offline region. This is followed by re-assignments of the 1539 // merged region from resumed {@link MergeTableRegionsProcedure} 1540 addRegionStateToPut(putOfMerged, RegionState.State.CLOSED); 1541 mutations.add(putOfMerged); 1542 // The merged is a new region, openSeqNum = 1 is fine. ServerName may be null 1543 // if crash after merge happened but before we got to here.. means in-memory 1544 // locations of offlined merged, now-closed, regions is lost. Should be ok. We 1545 // assign the merged region later. 1546 if (sn != null) { 1547 addLocation(putOfMerged, sn, 1, mergedRegion.getReplicaId()); 1548 } 1549 1550 // Add empty locations for region replicas of the merged region so that number of replicas 1551 // can be cached whenever the primary region is looked up from meta 1552 for (int i = 1; i < regionReplication; i++) { 1553 addEmptyLocation(putOfMerged, i); 1554 } 1555 // add parent reference for serial replication 1556 if (!replicationParents.isEmpty()) { 1557 addReplicationParent(putOfMerged, replicationParents); 1558 } 1559 byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER); 1560 multiMutate(meta, tableRow, mutations); 1561 } 1562 } 1563 1564 /** 1565 * Splits the region into two in an atomic operation. Offlines the parent region with the 1566 * information that it is split into two, and also adds the daughter regions. Does not add the 1567 * location information to the daughter regions since they are not open yet. 1568 * @param connection connection we're using 1569 * @param parent the parent region which is split 1570 * @param parentOpenSeqNum the next open sequence id for parent region, used by serial 1571 * replication. -1 if not necessary. 1572 * @param splitA Split daughter region A 1573 * @param splitB Split daughter region B 1574 * @param sn the location of the region 1575 */ 1576 public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum, 1577 RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication) 1578 throws IOException { 1579 try (Table meta = getMetaHTable(connection)) { 1580 long time = EnvironmentEdgeManager.currentTime(); 1581 // Put for parent 1582 Put putParent = makePutFromRegionInfo(RegionInfoBuilder.newBuilder(parent) 1583 .setOffline(true) 1584 .setSplit(true).build(), time); 1585 addDaughtersToPut(putParent, splitA, splitB); 1586 1587 // Puts for daughters 1588 Put putA = makePutFromRegionInfo(splitA, time); 1589 Put putB = makePutFromRegionInfo(splitB, time); 1590 if (parentOpenSeqNum > 0) { 1591 addReplicationBarrier(putParent, parentOpenSeqNum); 1592 addReplicationParent(putA, Collections.singletonList(parent)); 1593 addReplicationParent(putB, Collections.singletonList(parent)); 1594 } 1595 // Set initial state to CLOSED 1596 // NOTE: If initial state is not set to CLOSED then daughter regions get added with the 1597 // default OFFLINE state. If Master gets restarted after this step, start up sequence of 1598 // master tries to assign these offline regions. This is followed by re-assignments of the 1599 // daughter regions from resumed {@link SplitTableRegionProcedure} 1600 addRegionStateToPut(putA, RegionState.State.CLOSED); 1601 addRegionStateToPut(putB, RegionState.State.CLOSED); 1602 1603 addSequenceNum(putA, 1, splitA.getReplicaId()); // new regions, openSeqNum = 1 is fine. 1604 addSequenceNum(putB, 1, splitB.getReplicaId()); 1605 1606 // Add empty locations for region replicas of daughters so that number of replicas can be 1607 // cached whenever the primary region is looked up from meta 1608 for (int i = 1; i < regionReplication; i++) { 1609 addEmptyLocation(putA, i); 1610 addEmptyLocation(putB, i); 1611 } 1612 1613 byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER); 1614 multiMutate(meta, tableRow, putParent, putA, putB); 1615 } 1616 } 1617 1618 /** 1619 * Update state of the table in meta. 1620 * @param connection what we use for update 1621 * @param state new state 1622 */ 1623 private static void updateTableState(Connection connection, TableState state) throws IOException { 1624 Put put = makePutFromTableState(state, EnvironmentEdgeManager.currentTime()); 1625 putToMetaTable(connection, put); 1626 LOG.info("Updated {} in hbase:meta", state); 1627 } 1628 1629 /** 1630 * Construct PUT for given state 1631 * @param state new state 1632 */ 1633 public static Put makePutFromTableState(TableState state, long ts) { 1634 Put put = new Put(state.getTableName().getName(), ts); 1635 put.addColumn(getTableFamily(), getTableStateColumn(), state.convert().toByteArray()); 1636 return put; 1637 } 1638 1639 /** 1640 * Remove state for table from meta 1641 * @param connection to use for deletion 1642 * @param table to delete state for 1643 */ 1644 public static void deleteTableState(Connection connection, TableName table) 1645 throws IOException { 1646 long time = EnvironmentEdgeManager.currentTime(); 1647 Delete delete = new Delete(table.getName()); 1648 delete.addColumns(getTableFamily(), getTableStateColumn(), time); 1649 deleteFromMetaTable(connection, delete); 1650 LOG.info("Deleted table " + table + " state from META"); 1651 } 1652 1653 private static void multiMutate(Table table, byte[] row, 1654 Mutation... mutations) throws IOException { 1655 multiMutate(table, row, Arrays.asList(mutations)); 1656 } 1657 1658 /** 1659 * Performs an atomic multi-mutate operation against the given table. 1660 */ 1661 private static void multiMutate(final Table table, byte[] row, final List<Mutation> mutations) 1662 throws IOException { 1663 debugLogMutations(mutations); 1664 Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = 1665 new Batch.Call<MultiRowMutationService, MutateRowsResponse>() { 1666 1667 @Override 1668 public MutateRowsResponse call(MultiRowMutationService instance) throws IOException { 1669 MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder(); 1670 for (Mutation mutation : mutations) { 1671 if (mutation instanceof Put) { 1672 builder.addMutationRequest( 1673 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation)); 1674 } else if (mutation instanceof Delete) { 1675 builder.addMutationRequest( 1676 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, mutation)); 1677 } else { 1678 throw new DoNotRetryIOException( 1679 "multi in MetaEditor doesn't support " + mutation.getClass().getName()); 1680 } 1681 } 1682 ServerRpcController controller = new ServerRpcController(); 1683 CoprocessorRpcUtils.BlockingRpcCallback<MutateRowsResponse> rpcCallback = 1684 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 1685 instance.mutateRows(controller, builder.build(), rpcCallback); 1686 MutateRowsResponse resp = rpcCallback.get(); 1687 if (controller.failedOnException()) { 1688 throw controller.getFailedOn(); 1689 } 1690 return resp; 1691 } 1692 }; 1693 try { 1694 table.coprocessorService(MultiRowMutationService.class, row, row, callable); 1695 } catch (Throwable e) { 1696 Throwables.propagateIfPossible(e, IOException.class); 1697 throw new IOException(e); 1698 } 1699 } 1700 1701 /** 1702 * Updates the location of the specified region in hbase:meta to be the specified server hostname 1703 * and startcode. 1704 * <p> 1705 * Uses passed catalog tracker to get a connection to the server hosting hbase:meta and makes 1706 * edits to that region. 1707 * @param connection connection we're using 1708 * @param regionInfo region to update location of 1709 * @param openSeqNum the latest sequence number obtained when the region was open 1710 * @param sn Server name 1711 * @param masterSystemTime wall clock time from master if passed in the open region RPC 1712 */ 1713 @VisibleForTesting 1714 public static void updateRegionLocation(Connection connection, RegionInfo regionInfo, 1715 ServerName sn, long openSeqNum, long masterSystemTime) throws IOException { 1716 updateLocation(connection, regionInfo, sn, openSeqNum, masterSystemTime); 1717 } 1718 1719 /** 1720 * Updates the location of the specified region to be the specified server. 1721 * <p> 1722 * Connects to the specified server which should be hosting the specified catalog region name to 1723 * perform the edit. 1724 * @param connection connection we're using 1725 * @param regionInfo region to update location of 1726 * @param sn Server name 1727 * @param openSeqNum the latest sequence number obtained when the region was open 1728 * @param masterSystemTime wall clock time from master if passed in the open region RPC 1729 * @throws IOException In particular could throw {@link java.net.ConnectException} if the server 1730 * is down on other end. 1731 */ 1732 private static void updateLocation(Connection connection, RegionInfo regionInfo, ServerName sn, 1733 long openSeqNum, long masterSystemTime) throws IOException { 1734 // region replicas are kept in the primary region's row 1735 Put put = new Put(getMetaKeyForRegion(regionInfo), masterSystemTime); 1736 addRegionInfo(put, regionInfo); 1737 addLocation(put, sn, openSeqNum, regionInfo.getReplicaId()); 1738 putToMetaTable(connection, put); 1739 LOG.info("Updated row {} with server=", regionInfo.getRegionNameAsString(), sn); 1740 } 1741 1742 /** 1743 * Deletes the specified region from META. 1744 * @param connection connection we're using 1745 * @param regionInfo region to be deleted from META 1746 */ 1747 public static void deleteRegionInfo(Connection connection, RegionInfo regionInfo) 1748 throws IOException { 1749 Delete delete = new Delete(regionInfo.getRegionName()); 1750 delete.addFamily(getCatalogFamily(), HConstants.LATEST_TIMESTAMP); 1751 deleteFromMetaTable(connection, delete); 1752 LOG.info("Deleted " + regionInfo.getRegionNameAsString()); 1753 } 1754 1755 /** 1756 * Deletes the specified regions from META. 1757 * @param connection connection we're using 1758 * @param regionsInfo list of regions to be deleted from META 1759 */ 1760 public static void deleteRegionInfos(Connection connection, List<RegionInfo> regionsInfo) 1761 throws IOException { 1762 deleteRegionInfos(connection, regionsInfo, EnvironmentEdgeManager.currentTime()); 1763 } 1764 1765 /** 1766 * Deletes the specified regions from META. 1767 * @param connection connection we're using 1768 * @param regionsInfo list of regions to be deleted from META 1769 */ 1770 private static void deleteRegionInfos(Connection connection, List<RegionInfo> regionsInfo, 1771 long ts) 1772 throws IOException { 1773 List<Delete> deletes = new ArrayList<>(regionsInfo.size()); 1774 for (RegionInfo hri : regionsInfo) { 1775 Delete e = new Delete(hri.getRegionName()); 1776 e.addFamily(getCatalogFamily(), ts); 1777 deletes.add(e); 1778 } 1779 deleteFromMetaTable(connection, deletes); 1780 LOG.info("Deleted {} regions from META", regionsInfo.size()); 1781 LOG.debug("Deleted regions: {}", regionsInfo); 1782 } 1783 1784 /** 1785 * Overwrites the specified regions from hbase:meta. Deletes old rows for the given regions and 1786 * adds new ones. Regions added back have state CLOSED. 1787 * @param connection connection we're using 1788 * @param regionInfos list of regions to be added to META 1789 */ 1790 public static void overwriteRegions(Connection connection, List<RegionInfo> regionInfos, 1791 int regionReplication) throws IOException { 1792 // use master time for delete marker and the Put 1793 long now = EnvironmentEdgeManager.currentTime(); 1794 deleteRegionInfos(connection, regionInfos, now); 1795 // Why sleep? This is the easiest way to ensure that the previous deletes does not 1796 // eclipse the following puts, that might happen in the same ts from the server. 1797 // See HBASE-9906, and HBASE-9879. Once either HBASE-9879, HBASE-8770 is fixed, 1798 // or HBASE-9905 is fixed and meta uses seqIds, we do not need the sleep. 1799 // 1800 // HBASE-13875 uses master timestamp for the mutations. The 20ms sleep is not needed 1801 addRegionsToMeta(connection, regionInfos, regionReplication, now + 1); 1802 LOG.info("Overwritten " + regionInfos.size() + " regions to Meta"); 1803 LOG.debug("Overwritten regions: {} ", regionInfos); 1804 } 1805 1806 /** 1807 * Deletes merge qualifiers for the specified merge region. 1808 * @param connection connection we're using 1809 * @param mergeRegion the merged region 1810 */ 1811 public static void deleteMergeQualifiers(Connection connection, final RegionInfo mergeRegion) 1812 throws IOException { 1813 Delete delete = new Delete(mergeRegion.getRegionName()); 1814 // NOTE: We are doing a new hbase:meta read here. 1815 Cell[] cells = getRegionResult(connection, mergeRegion.getRegionName()).rawCells(); 1816 if (cells == null || cells.length == 0) { 1817 return; 1818 } 1819 List<byte[]> qualifiers = new ArrayList<>(); 1820 for (Cell cell : cells) { 1821 if (!isMergeQualifierPrefix(cell)) { 1822 continue; 1823 } 1824 byte[] qualifier = CellUtil.cloneQualifier(cell); 1825 qualifiers.add(qualifier); 1826 delete.addColumns(getCatalogFamily(), qualifier, HConstants.LATEST_TIMESTAMP); 1827 } 1828 deleteFromMetaTable(connection, delete); 1829 LOG.info("Deleted merge references in " + mergeRegion.getRegionNameAsString() + 1830 ", deleted qualifiers " + qualifiers.stream().map(Bytes::toStringBinary). 1831 collect(Collectors.joining(", "))); 1832 } 1833 1834 public static Put addRegionInfo(final Put p, final RegionInfo hri) 1835 throws IOException { 1836 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 1837 .setRow(p.getRow()) 1838 .setFamily(getCatalogFamily()) 1839 .setQualifier(HConstants.REGIONINFO_QUALIFIER) 1840 .setTimestamp(p.getTimestamp()) 1841 .setType(Type.Put) 1842 // Serialize the Default Replica HRI otherwise scan of hbase:meta 1843 // shows an info:regioninfo value with encoded name and region 1844 // name that differs from that of the hbase;meta row. 1845 .setValue(RegionInfo.toByteArray(RegionReplicaUtil.getRegionInfoForDefaultReplica(hri))) 1846 .build()); 1847 return p; 1848 } 1849 1850 public static Put addLocation(Put p, ServerName sn, long openSeqNum, int replicaId) 1851 throws IOException { 1852 CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 1853 return p.add(builder.clear() 1854 .setRow(p.getRow()) 1855 .setFamily(getCatalogFamily()) 1856 .setQualifier(getServerColumn(replicaId)) 1857 .setTimestamp(p.getTimestamp()) 1858 .setType(Cell.Type.Put) 1859 .setValue(Bytes.toBytes(sn.getAddress().toString())) 1860 .build()) 1861 .add(builder.clear() 1862 .setRow(p.getRow()) 1863 .setFamily(getCatalogFamily()) 1864 .setQualifier(getStartCodeColumn(replicaId)) 1865 .setTimestamp(p.getTimestamp()) 1866 .setType(Cell.Type.Put) 1867 .setValue(Bytes.toBytes(sn.getStartcode())) 1868 .build()) 1869 .add(builder.clear() 1870 .setRow(p.getRow()) 1871 .setFamily(getCatalogFamily()) 1872 .setQualifier(getSeqNumColumn(replicaId)) 1873 .setTimestamp(p.getTimestamp()) 1874 .setType(Type.Put) 1875 .setValue(Bytes.toBytes(openSeqNum)) 1876 .build()); 1877 } 1878 1879 private static void writeRegionName(ByteArrayOutputStream out, byte[] regionName) { 1880 for (byte b : regionName) { 1881 if (b == ESCAPE_BYTE) { 1882 out.write(ESCAPE_BYTE); 1883 } 1884 out.write(b); 1885 } 1886 } 1887 1888 @VisibleForTesting 1889 public static byte[] getParentsBytes(List<RegionInfo> parents) { 1890 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 1891 Iterator<RegionInfo> iter = parents.iterator(); 1892 writeRegionName(bos, iter.next().getRegionName()); 1893 while (iter.hasNext()) { 1894 bos.write(ESCAPE_BYTE); 1895 bos.write(SEPARATED_BYTE); 1896 writeRegionName(bos, iter.next().getRegionName()); 1897 } 1898 return bos.toByteArray(); 1899 } 1900 1901 private static List<byte[]> parseParentsBytes(byte[] bytes) { 1902 List<byte[]> parents = new ArrayList<>(); 1903 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 1904 for (int i = 0; i < bytes.length; i++) { 1905 if (bytes[i] == ESCAPE_BYTE) { 1906 i++; 1907 if (bytes[i] == SEPARATED_BYTE) { 1908 parents.add(bos.toByteArray()); 1909 bos.reset(); 1910 continue; 1911 } 1912 // fall through to append the byte 1913 } 1914 bos.write(bytes[i]); 1915 } 1916 if (bos.size() > 0) { 1917 parents.add(bos.toByteArray()); 1918 } 1919 return parents; 1920 } 1921 1922 private static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException { 1923 byte[] value = getParentsBytes(parents); 1924 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) 1925 .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER) 1926 .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build()); 1927 } 1928 1929 public static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts) 1930 throws IOException { 1931 Put put = new Put(regionInfo.getRegionName(), ts); 1932 addReplicationBarrier(put, openSeqNum); 1933 return put; 1934 } 1935 1936 /** 1937 * See class comment on SerialReplicationChecker 1938 */ 1939 public static void addReplicationBarrier(Put put, long openSeqNum) throws IOException { 1940 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 1941 .setRow(put.getRow()) 1942 .setFamily(HConstants.REPLICATION_BARRIER_FAMILY) 1943 .setQualifier(HConstants.SEQNUM_QUALIFIER) 1944 .setTimestamp(put.getTimestamp()) 1945 .setType(Type.Put) 1946 .setValue(Bytes.toBytes(openSeqNum)) 1947 .build()); 1948 } 1949 1950 private static Put addEmptyLocation(Put p, int replicaId) throws IOException { 1951 CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 1952 return p.add(builder.clear() 1953 .setRow(p.getRow()) 1954 .setFamily(getCatalogFamily()) 1955 .setQualifier(getServerColumn(replicaId)) 1956 .setTimestamp(p.getTimestamp()) 1957 .setType(Type.Put) 1958 .build()) 1959 .add(builder.clear() 1960 .setRow(p.getRow()) 1961 .setFamily(getCatalogFamily()) 1962 .setQualifier(getStartCodeColumn(replicaId)) 1963 .setTimestamp(p.getTimestamp()) 1964 .setType(Cell.Type.Put) 1965 .build()) 1966 .add(builder.clear() 1967 .setRow(p.getRow()) 1968 .setFamily(getCatalogFamily()) 1969 .setQualifier(getSeqNumColumn(replicaId)) 1970 .setTimestamp(p.getTimestamp()) 1971 .setType(Cell.Type.Put) 1972 .build()); 1973 } 1974 1975 public static final class ReplicationBarrierResult { 1976 private final long[] barriers; 1977 private final RegionState.State state; 1978 private final List<byte[]> parentRegionNames; 1979 1980 ReplicationBarrierResult(long[] barriers, State state, List<byte[]> parentRegionNames) { 1981 this.barriers = barriers; 1982 this.state = state; 1983 this.parentRegionNames = parentRegionNames; 1984 } 1985 1986 public long[] getBarriers() { 1987 return barriers; 1988 } 1989 1990 public RegionState.State getState() { 1991 return state; 1992 } 1993 1994 public List<byte[]> getParentRegionNames() { 1995 return parentRegionNames; 1996 } 1997 1998 @Override 1999 public String toString() { 2000 return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) + ", state=" + 2001 state + ", parentRegionNames=" + 2002 parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")) + 2003 "]"; 2004 } 2005 } 2006 2007 private static long getReplicationBarrier(Cell c) { 2008 return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()); 2009 } 2010 2011 public static long[] getReplicationBarriers(Result result) { 2012 return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER) 2013 .stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray(); 2014 } 2015 2016 private static ReplicationBarrierResult getReplicationBarrierResult(Result result) { 2017 long[] barriers = getReplicationBarriers(result); 2018 byte[] stateBytes = result.getValue(getCatalogFamily(), getRegionStateColumn()); 2019 RegionState.State state = 2020 stateBytes != null ? RegionState.State.valueOf(Bytes.toString(stateBytes)) : null; 2021 byte[] parentRegionsBytes = 2022 result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER); 2023 List<byte[]> parentRegionNames = 2024 parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : Collections.emptyList(); 2025 return new ReplicationBarrierResult(barriers, state, parentRegionNames); 2026 } 2027 2028 public static ReplicationBarrierResult getReplicationBarrierResult(Connection conn, 2029 TableName tableName, byte[] row, byte[] encodedRegionName) throws IOException { 2030 byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 2031 byte[] metaStopKey = 2032 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 2033 Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey) 2034 .addColumn(getCatalogFamily(), getRegionStateColumn()) 2035 .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true) 2036 .setCaching(10); 2037 try (Table table = getMetaHTable(conn); ResultScanner scanner = table.getScanner(scan)) { 2038 for (Result result;;) { 2039 result = scanner.next(); 2040 if (result == null) { 2041 return new ReplicationBarrierResult(new long[0], null, Collections.emptyList()); 2042 } 2043 byte[] regionName = result.getRow(); 2044 // TODO: we may look up a region which has already been split or merged so we need to check 2045 // whether the encoded name matches. Need to find a way to quit earlier when there is no 2046 // record for the given region, for now it will scan to the end of the table. 2047 if (!Bytes.equals(encodedRegionName, 2048 Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))) { 2049 continue; 2050 } 2051 return getReplicationBarrierResult(result); 2052 } 2053 } 2054 } 2055 2056 public static long[] getReplicationBarrier(Connection conn, byte[] regionName) 2057 throws IOException { 2058 try (Table table = getMetaHTable(conn)) { 2059 Result result = table.get(new Get(regionName) 2060 .addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER) 2061 .readAllVersions()); 2062 return getReplicationBarriers(result); 2063 } 2064 } 2065 2066 public static List<Pair<String, Long>> getTableEncodedRegionNameAndLastBarrier(Connection conn, 2067 TableName tableName) throws IOException { 2068 List<Pair<String, Long>> list = new ArrayList<>(); 2069 scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION), 2070 getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION, r -> { 2071 byte[] value = 2072 r.getValue(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER); 2073 if (value == null) { 2074 return true; 2075 } 2076 long lastBarrier = Bytes.toLong(value); 2077 String encodedRegionName = RegionInfo.encodeRegionName(r.getRow()); 2078 list.add(Pair.newPair(encodedRegionName, lastBarrier)); 2079 return true; 2080 }); 2081 return list; 2082 } 2083 2084 public static List<String> getTableEncodedRegionNamesForSerialReplication(Connection conn, 2085 TableName tableName) throws IOException { 2086 List<String> list = new ArrayList<>(); 2087 scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION), 2088 getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION, 2089 new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> { 2090 list.add(RegionInfo.encodeRegionName(r.getRow())); 2091 return true; 2092 }); 2093 return list; 2094 } 2095 2096 private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException { 2097 if (!METALOG.isDebugEnabled()) { 2098 return; 2099 } 2100 // Logging each mutation in separate line makes it easier to see diff between them visually 2101 // because of common starting indentation. 2102 for (Mutation mutation : mutations) { 2103 debugLogMutation(mutation); 2104 } 2105 } 2106 2107 private static void debugLogMutation(Mutation p) throws IOException { 2108 METALOG.debug("{} {}", p.getClass().getSimpleName(), p.toJSON()); 2109 } 2110 2111 private static Put addSequenceNum(Put p, long openSeqNum, int replicaId) throws IOException { 2112 return p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 2113 .setRow(p.getRow()) 2114 .setFamily(HConstants.CATALOG_FAMILY) 2115 .setQualifier(getSeqNumColumn(replicaId)) 2116 .setTimestamp(p.getTimestamp()) 2117 .setType(Type.Put) 2118 .setValue(Bytes.toBytes(openSeqNum)) 2119 .build()); 2120 } 2121}