001/* 002* 003* Licensed to the Apache Software Foundation (ASF) under one 004* or more contributor license agreements. See the NOTICE file 005* distributed with this work for additional information 006* regarding copyright ownership. The ASF licenses this file 007* to you under the Apache License, Version 2.0 (the 008* "License"); you may not use this file except in compliance 009* with the License. You may obtain a copy of the License at 010* 011* http://www.apache.org/licenses/LICENSE-2.0 012* 013* Unless required by applicable law or agreed to in writing, software 014* distributed under the License is distributed on an "AS IS" BASIS, 015* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016* See the License for the specific language governing permissions and 017* limitations under the License. 018*/ 019package org.apache.hadoop.hbase.replication; 020 021import org.apache.hadoop.hbase.CompareOperator; 022import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.Abortable; 025import org.apache.hadoop.hbase.HColumnDescriptor; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.HTableDescriptor; 028import org.apache.hadoop.hbase.NamespaceDescriptor; 029import org.apache.hadoop.hbase.TableExistsException; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 041import org.apache.hadoop.hbase.regionserver.BloomType; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.RetryCounter; 044import org.apache.hadoop.hbase.util.RetryCounterFactory; 045 046import java.io.IOException; 047import java.io.InterruptedIOException; 048import java.util.ArrayList; 049import java.util.Arrays; 050import java.util.HashSet; 051import java.util.List; 052import java.util.Map; 053import java.util.Set; 054import java.util.concurrent.CountDownLatch; 055import java.util.concurrent.Executor; 056import java.util.concurrent.LinkedBlockingQueue; 057import java.util.concurrent.ThreadPoolExecutor; 058import java.util.concurrent.TimeUnit; 059 060/* 061 * Abstract class that provides an interface to the Replication Table. Which is currently 062 * being used for WAL offset tracking. 063 * The basic schema of this table will store each individual queue as a 064 * seperate row. The row key will be a unique identifier of the creating server's name and the 065 * queueId. Each queue must have the following two columns: 066 * COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue 067 * COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this 068 * queue. The most recent previous owner is the leftmost entry. 069 * They will also have columns mapping [WAL filename : offset] 070 * The most flexible method of interacting with the Replication Table is by calling 071 * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up 072 * to the caller to close the returned table. 073 */ 074@InterfaceAudience.Private 075abstract class ReplicationTableBase { 076 077 /** Name of the HBase Table used for tracking replication*/ 078 public static final TableName REPLICATION_TABLE_NAME = 079 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); 080 081 // Column family and column names for Queues in the Replication Table 082 public static final byte[] CF_QUEUE = Bytes.toBytes("q"); 083 public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o"); 084 public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h"); 085 086 // Column Descriptor for the Replication Table 087 private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = 088 new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) 089 .setInMemory(true) 090 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 091 // TODO: Figure out which bloom filter to use 092 .setBloomFilterType(BloomType.NONE); 093 094 // The value used to delimit the queueId and server name inside of a queue's row key. Currently a 095 // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. 096 // See HBASE-11394. 097 public static final String ROW_KEY_DELIMITER = "-"; 098 099 // The value used to delimit server names in the queue history list 100 public static final String QUEUE_HISTORY_DELIMITER = "|"; 101 102 /* 103 * Make sure that HBase table operations for replication have a high number of retries. This is 104 * because the server is aborted if any HBase table operation fails. Each RPC will be attempted 105 * 3600 times before exiting. This provides each operation with 2 hours of retries 106 * before the server is aborted. 107 */ 108 private static final int CLIENT_RETRIES = 3600; 109 private static final int RPC_TIMEOUT = 2000; 110 private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; 111 112 // We only need a single thread to initialize the Replication Table 113 private static final int NUM_INITIALIZE_WORKERS = 1; 114 115 protected final Configuration conf; 116 protected final Abortable abortable; 117 private final Connection connection; 118 private final Executor executor; 119 private volatile CountDownLatch replicationTableInitialized; 120 121 public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { 122 this.conf = new Configuration(conf); 123 this.abortable = abort; 124 decorateConf(); 125 this.connection = ConnectionFactory.createConnection(this.conf); 126 this.executor = setUpExecutor(); 127 this.replicationTableInitialized = new CountDownLatch(1); 128 createReplicationTableInBackground(); 129 } 130 131 /** 132 * Modify the connection's config so that operations run on the Replication Table have longer and 133 * a larger number of retries 134 */ 135 private void decorateConf() { 136 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); 137 } 138 139 /** 140 * Sets up the thread pool executor used to build the Replication Table in the background 141 * @return the configured executor 142 */ 143 private Executor setUpExecutor() { 144 ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS, 145 NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); 146 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 147 tfb.setNameFormat("ReplicationTableExecutor-%d"); 148 tfb.setDaemon(true); 149 tempExecutor.setThreadFactory(tfb.build()); 150 return tempExecutor; 151 } 152 153 /** 154 * Get whether the Replication Table has been successfully initialized yet 155 * @return whether the Replication Table is initialized 156 */ 157 public boolean getInitializationStatus() { 158 return replicationTableInitialized.getCount() == 0; 159 } 160 161 /** 162 * Increases the RPC and operations timeouts for the Replication Table 163 */ 164 private Table setReplicationTableTimeOuts(Table replicationTable) { 165 replicationTable.setRpcTimeout(RPC_TIMEOUT); 166 replicationTable.setOperationTimeout(OPERATION_TIMEOUT); 167 return replicationTable; 168 } 169 170 /** 171 * Build the row key for the given queueId. This will uniquely identify it from all other queues 172 * in the cluster. 173 * @param serverName The owner of the queue 174 * @param queueId String identifier of the queue 175 * @return String representation of the queue's row key 176 */ 177 protected String buildQueueRowKey(String serverName, String queueId) { 178 return queueId + ROW_KEY_DELIMITER + serverName; 179 } 180 181 /** 182 * Parse the original queueId from a row key 183 * @param rowKey String representation of a queue's row key 184 * @return the original queueId 185 */ 186 protected String getRawQueueIdFromRowKey(String rowKey) { 187 return rowKey.split(ROW_KEY_DELIMITER)[0]; 188 } 189 190 /** 191 * Returns a queue's row key given either its raw or reclaimed queueId 192 * 193 * @param queueId queueId of the queue 194 * @return byte representation of the queue's row key 195 */ 196 protected byte[] queueIdToRowKey(String serverName, String queueId) { 197 // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen 198 // then this is not a reclaimed queue. 199 if (!queueId.contains(ROW_KEY_DELIMITER)) { 200 return Bytes.toBytes(buildQueueRowKey(serverName, queueId)); 201 // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the 202 // queue's row key 203 } else { 204 return Bytes.toBytes(queueId); 205 } 206 } 207 208 /** 209 * Creates a "|" delimited record of the queue's past region server owners. 210 * 211 * @param originalHistory the queue's original owner history 212 * @param oldServer the name of the server that used to own the queue 213 * @return the queue's new owner history 214 */ 215 protected String buildClaimedQueueHistory(String originalHistory, String oldServer) { 216 return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory; 217 } 218 219 /** 220 * Get a list of all region servers that have outstanding replication queues. These servers could 221 * be alive, dead or from a previous run of the cluster. 222 * @return a list of server names 223 */ 224 protected List<String> getListOfReplicators() { 225 // scan all of the queues and return a list of all unique OWNER values 226 Set<String> peerServers = new HashSet<>(); 227 ResultScanner allQueuesInCluster = null; 228 try (Table replicationTable = getOrBlockOnReplicationTable()){ 229 Scan scan = new Scan(); 230 scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); 231 allQueuesInCluster = replicationTable.getScanner(scan); 232 for (Result queue : allQueuesInCluster) { 233 peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); 234 } 235 } catch (IOException e) { 236 String errMsg = "Failed getting list of replicators"; 237 abortable.abort(errMsg, e); 238 } finally { 239 if (allQueuesInCluster != null) { 240 allQueuesInCluster.close(); 241 } 242 } 243 return new ArrayList<>(peerServers); 244 } 245 246 protected List<String> getAllQueues(String serverName) { 247 List<String> allQueues = new ArrayList<>(); 248 ResultScanner queueScanner = null; 249 try { 250 queueScanner = getQueuesBelongingToServer(serverName); 251 for (Result queue : queueScanner) { 252 String rowKey = Bytes.toString(queue.getRow()); 253 // If the queue does not have a Owner History, then we must be its original owner. So we 254 // want to return its queueId in raw form 255 if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) { 256 allQueues.add(getRawQueueIdFromRowKey(rowKey)); 257 } else { 258 allQueues.add(rowKey); 259 } 260 } 261 return allQueues; 262 } catch (IOException e) { 263 String errMsg = "Failed getting list of all replication queues for serverName=" + serverName; 264 abortable.abort(errMsg, e); 265 return null; 266 } finally { 267 if (queueScanner != null) { 268 queueScanner.close(); 269 } 270 } 271 } 272 273 protected List<String> getLogsInQueue(String serverName, String queueId) { 274 String rowKey = queueId; 275 if (!queueId.contains(ROW_KEY_DELIMITER)) { 276 rowKey = buildQueueRowKey(serverName, queueId); 277 } 278 return getLogsInQueue(Bytes.toBytes(rowKey)); 279 } 280 281 protected List<String> getLogsInQueue(byte[] rowKey) { 282 String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey); 283 try (Table replicationTable = getOrBlockOnReplicationTable()) { 284 Get getQueue = new Get(rowKey); 285 Result queue = replicationTable.get(getQueue); 286 if (queue == null || queue.isEmpty()) { 287 abortable.abort(errMsg, new ReplicationException(errMsg)); 288 return null; 289 } 290 return readWALsFromResult(queue); 291 } catch (IOException e) { 292 abortable.abort(errMsg, e); 293 return null; 294 } 295 } 296 297 /** 298 * Read all of the WAL's from a queue into a list 299 * 300 * @param queue HBase query result containing the queue 301 * @return a list of all the WAL filenames 302 */ 303 protected List<String> readWALsFromResult(Result queue) { 304 List<String> wals = new ArrayList<>(); 305 Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE); 306 for (byte[] cQualifier : familyMap.keySet()) { 307 // Ignore the meta data fields of the queue 308 if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, 309 COL_QUEUE_OWNER_HISTORY)) { 310 continue; 311 } 312 wals.add(Bytes.toString(cQualifier)); 313 } 314 return wals; 315 } 316 317 /** 318 * Get the queue id's and meta data (Owner and History) for the queues belonging to the named 319 * server 320 * 321 * @param server name of the server 322 * @return a ResultScanner over the QueueIds belonging to the server 323 * @throws IOException 324 */ 325 protected ResultScanner getQueuesBelongingToServer(String server) throws IOException { 326 Scan scan = new Scan(); 327 SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, 328 CompareOperator.EQUAL, Bytes.toBytes(server)); 329 scan.setFilter(filterMyQueues); 330 scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); 331 scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); 332 try (Table replicationTable = getOrBlockOnReplicationTable()) { 333 ResultScanner results = replicationTable.getScanner(scan); 334 return results; 335 } 336 } 337 338 /** 339 * Attempts to acquire the Replication Table. This operation will block until it is assigned by 340 * the CreateReplicationWorker thread. It is up to the caller of this method to close the 341 * returned Table 342 * @return the Replication Table when it is created 343 * @throws IOException 344 */ 345 protected Table getOrBlockOnReplicationTable() throws IOException { 346 // Sleep until the Replication Table becomes available 347 try { 348 replicationTableInitialized.await(); 349 } catch (InterruptedException e) { 350 String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " + 351 e.getMessage(); 352 throw new InterruptedIOException(errMsg); 353 } 354 return getAndSetUpReplicationTable(); 355 } 356 357 /** 358 * Creates a new copy of the Replication Table and sets up the proper Table time outs for it 359 * 360 * @return the Replication Table 361 * @throws IOException 362 */ 363 private Table getAndSetUpReplicationTable() throws IOException { 364 Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME); 365 setReplicationTableTimeOuts(replicationTable); 366 return replicationTable; 367 } 368 369 /** 370 * Builds the Replication Table in a background thread. Any method accessing the Replication Table 371 * should do so through getOrBlockOnReplicationTable() 372 * 373 * @return the Replication Table 374 * @throws IOException if the Replication Table takes too long to build 375 */ 376 private void createReplicationTableInBackground() throws IOException { 377 executor.execute(new CreateReplicationTableWorker()); 378 } 379 380 /** 381 * Attempts to build the Replication Table. Will continue blocking until we have a valid 382 * Table for the Replication Table. 383 */ 384 private class CreateReplicationTableWorker implements Runnable { 385 386 private Admin admin; 387 388 @Override 389 public void run() { 390 try { 391 admin = connection.getAdmin(); 392 if (!replicationTableExists()) { 393 createReplicationTable(); 394 } 395 int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number", 396 CLIENT_RETRIES); 397 RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT); 398 RetryCounter retryCounter = counterFactory.create(); 399 while (!replicationTableExists()) { 400 retryCounter.sleepUntilNextRetry(); 401 if (!retryCounter.shouldRetry()) { 402 throw new IOException("Unable to acquire the Replication Table"); 403 } 404 } 405 replicationTableInitialized.countDown(); 406 } catch (IOException | InterruptedException e) { 407 abortable.abort("Failed building Replication Table", e); 408 } 409 } 410 411 /** 412 * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR 413 * in TableBasedReplicationQueuesImpl 414 * 415 * @throws IOException 416 */ 417 private void createReplicationTable() throws IOException { 418 HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); 419 replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); 420 try { 421 admin.createTable(replicationTableDescriptor); 422 } catch (TableExistsException e) { 423 // In this case we can just continue as normal 424 } 425 } 426 427 /** 428 * Checks whether the Replication Table exists yet 429 * 430 * @return whether the Replication Table exists 431 * @throws IOException 432 */ 433 private boolean replicationTableExists() { 434 try { 435 return admin.tableExists(REPLICATION_TABLE_NAME); 436 } catch (IOException e) { 437 return false; 438 } 439 } 440 } 441}