001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import com.google.protobuf.Descriptors; 021import com.google.protobuf.Message; 022import com.google.protobuf.RpcController; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.EnumSet; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.LinkedList; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037import java.util.concurrent.Callable; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.Future; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.TimeoutException; 042import java.util.concurrent.atomic.AtomicInteger; 043import java.util.concurrent.atomic.AtomicReference; 044import java.util.function.Supplier; 045import java.util.regex.Pattern; 046import java.util.stream.Collectors; 047import java.util.stream.Stream; 048 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.Abortable; 051import org.apache.hadoop.hbase.CacheEvictionStats; 052import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 053import org.apache.hadoop.hbase.ClusterMetrics; 054import org.apache.hadoop.hbase.ClusterMetrics.Option; 055import org.apache.hadoop.hbase.ClusterMetricsBuilder; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.HBaseConfiguration; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.HRegionInfo; 060import org.apache.hadoop.hbase.HRegionLocation; 061import org.apache.hadoop.hbase.HTableDescriptor; 062import org.apache.hadoop.hbase.MasterNotRunningException; 063import org.apache.hadoop.hbase.MetaTableAccessor; 064import org.apache.hadoop.hbase.NamespaceDescriptor; 065import org.apache.hadoop.hbase.NamespaceNotFoundException; 066import org.apache.hadoop.hbase.NotServingRegionException; 067import org.apache.hadoop.hbase.RegionLocations; 068import org.apache.hadoop.hbase.RegionMetrics; 069import org.apache.hadoop.hbase.RegionMetricsBuilder; 070import org.apache.hadoop.hbase.ServerName; 071import org.apache.hadoop.hbase.TableExistsException; 072import org.apache.hadoop.hbase.TableName; 073import org.apache.hadoop.hbase.TableNotDisabledException; 074import org.apache.hadoop.hbase.TableNotFoundException; 075import org.apache.hadoop.hbase.UnknownRegionException; 076import org.apache.hadoop.hbase.ZooKeeperConnectionException; 077import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 078import org.apache.hadoop.hbase.client.replication.TableCFs; 079import org.apache.hadoop.hbase.client.security.SecurityCapability; 080import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 081import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 082import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 083import org.apache.hadoop.hbase.ipc.HBaseRpcController; 084import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 085import org.apache.hadoop.hbase.quotas.QuotaFilter; 086import org.apache.hadoop.hbase.quotas.QuotaRetriever; 087import org.apache.hadoop.hbase.quotas.QuotaSettings; 088import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 089import org.apache.hadoop.hbase.replication.ReplicationException; 090import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 091import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 093import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; 094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 096import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException; 097import org.apache.hadoop.hbase.util.Addressing; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 100import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.ipc.RemoteException; 103import org.apache.hadoop.util.StringUtils; 104import org.apache.yetus.audience.InterfaceAudience; 105import org.apache.yetus.audience.InterfaceStability; 106import org.slf4j.Logger; 107import org.slf4j.LoggerFactory; 108 109import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 110import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 111import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 112import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 214 215/** 216 * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that 217 * this is an HBase-internal class as defined in 218 * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html 219 * There are no guarantees for backwards source / binary compatibility and methods or class can 220 * change or go away without deprecation. 221 * Use {@link Connection#getAdmin()} to obtain an instance of {@link Admin} instead of constructing 222 * an HBaseAdmin directly. 223 * 224 * <p>Connection should be an <i>unmanaged</i> connection obtained via 225 * {@link ConnectionFactory#createConnection(Configuration)} 226 * 227 * @see ConnectionFactory 228 * @see Connection 229 * @see Admin 230 */ 231@InterfaceAudience.Private 232public class HBaseAdmin implements Admin { 233 private static final Logger LOG = LoggerFactory.getLogger(HBaseAdmin.class); 234 235 private ClusterConnection connection; 236 237 private final Configuration conf; 238 private final long pause; 239 private final int numRetries; 240 private final int syncWaitTimeout; 241 private boolean aborted; 242 private int operationTimeout; 243 private int rpcTimeout; 244 private int getProcedureTimeout; 245 246 private RpcRetryingCallerFactory rpcCallerFactory; 247 private RpcControllerFactory rpcControllerFactory; 248 249 private NonceGenerator ng; 250 251 @Override 252 public int getOperationTimeout() { 253 return operationTimeout; 254 } 255 256 HBaseAdmin(ClusterConnection connection) throws IOException { 257 this.conf = connection.getConfiguration(); 258 this.connection = connection; 259 260 // TODO: receive ConnectionConfiguration here rather than re-parsing these configs every time. 261 this.pause = this.conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 262 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 263 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 264 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 265 this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 266 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 267 this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 268 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 269 this.syncWaitTimeout = this.conf.getInt( 270 "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min 271 this.getProcedureTimeout = 272 this.conf.getInt("hbase.client.procedure.future.get.timeout.msec", 10 * 60000); // 10min 273 274 this.rpcCallerFactory = connection.getRpcRetryingCallerFactory(); 275 this.rpcControllerFactory = connection.getRpcControllerFactory(); 276 277 this.ng = this.connection.getNonceGenerator(); 278 } 279 280 @Override 281 public void abort(String why, Throwable e) { 282 // Currently does nothing but throw the passed message and exception 283 this.aborted = true; 284 throw new RuntimeException(why, e); 285 } 286 287 @Override 288 public boolean isAborted() { 289 return this.aborted; 290 } 291 292 @Override 293 public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning) 294 throws IOException { 295 return get(abortProcedureAsync(procId, mayInterruptIfRunning), this.syncWaitTimeout, 296 TimeUnit.MILLISECONDS); 297 } 298 299 @Override 300 public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning) 301 throws IOException { 302 Boolean abortProcResponse = 303 executeCallable(new MasterCallable<AbortProcedureResponse>(getConnection(), 304 getRpcControllerFactory()) { 305 @Override 306 protected AbortProcedureResponse rpcCall() throws Exception { 307 AbortProcedureRequest abortProcRequest = 308 AbortProcedureRequest.newBuilder().setProcId(procId).build(); 309 return master.abortProcedure(getRpcController(), abortProcRequest); 310 } 311 }).getIsProcedureAborted(); 312 return new AbortProcedureFuture(this, procId, abortProcResponse); 313 } 314 315 @Override 316 public List<TableDescriptor> listTableDescriptors() throws IOException { 317 return listTableDescriptors((Pattern)null, false); 318 } 319 320 @Override 321 public List<TableDescriptor> listTableDescriptors(Pattern pattern) throws IOException { 322 return listTableDescriptors(pattern, false); 323 } 324 325 @Override 326 public List<TableDescriptor> listTableDescriptors(Pattern pattern, boolean includeSysTables) 327 throws IOException { 328 return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(), 329 getRpcControllerFactory()) { 330 @Override 331 protected List<TableDescriptor> rpcCall() throws Exception { 332 GetTableDescriptorsRequest req = 333 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables); 334 return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(), 335 req)); 336 } 337 }); 338 } 339 340 @Override 341 public TableDescriptor getDescriptor(TableName tableName) 342 throws TableNotFoundException, IOException { 343 return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory, 344 operationTimeout, rpcTimeout); 345 } 346 347 @Override 348 public void modifyTable(TableDescriptor td) throws IOException { 349 get(modifyTableAsync(td), syncWaitTimeout, TimeUnit.MILLISECONDS); 350 } 351 352 @Override 353 public Future<Void> modifyTableAsync(TableDescriptor td) throws IOException { 354 ModifyTableResponse response = executeCallable( 355 new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) { 356 Long nonceGroup = ng.getNonceGroup(); 357 Long nonce = ng.newNonce(); 358 @Override 359 protected ModifyTableResponse rpcCall() throws Exception { 360 setPriority(td.getTableName()); 361 ModifyTableRequest request = RequestConverter.buildModifyTableRequest( 362 td.getTableName(), td, nonceGroup, nonce); 363 return master.modifyTable(getRpcController(), request); 364 } 365 }); 366 return new ModifyTableFuture(this, td.getTableName(), response); 367 } 368 369 @Override 370 public List<TableDescriptor> listTableDescriptorsByNamespace(byte[] name) throws IOException { 371 return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(), 372 getRpcControllerFactory()) { 373 @Override 374 protected List<TableDescriptor> rpcCall() throws Exception { 375 return master.listTableDescriptorsByNamespace(getRpcController(), 376 ListTableDescriptorsByNamespaceRequest.newBuilder() 377 .setNamespaceName(Bytes.toString(name)).build()) 378 .getTableSchemaList() 379 .stream() 380 .map(ProtobufUtil::toTableDescriptor) 381 .collect(Collectors.toList()); 382 } 383 }); 384 } 385 386 @Override 387 public List<TableDescriptor> listTableDescriptors(List<TableName> tableNames) throws IOException { 388 return executeCallable(new MasterCallable<List<TableDescriptor>>(getConnection(), 389 getRpcControllerFactory()) { 390 @Override 391 protected List<TableDescriptor> rpcCall() throws Exception { 392 GetTableDescriptorsRequest req = 393 RequestConverter.buildGetTableDescriptorsRequest(tableNames); 394 return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(), 395 req)); 396 } 397 }); 398 } 399 400 @Override 401 public List<RegionInfo> getRegions(final ServerName sn) throws IOException { 402 AdminService.BlockingInterface admin = this.connection.getAdmin(sn); 403 // TODO: There is no timeout on this controller. Set one! 404 HBaseRpcController controller = rpcControllerFactory.newController(); 405 return ProtobufUtil.getOnlineRegions(controller, admin); 406 } 407 408 @Override 409 public List<RegionInfo> getRegions(TableName tableName) throws IOException { 410 if (TableName.isMetaTableName(tableName)) { 411 return Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO); 412 } else { 413 return MetaTableAccessor.getTableRegions(connection, tableName, true); 414 } 415 } 416 417 private static class AbortProcedureFuture extends ProcedureFuture<Boolean> { 418 private boolean isAbortInProgress; 419 420 public AbortProcedureFuture( 421 final HBaseAdmin admin, 422 final Long procId, 423 final Boolean abortProcResponse) { 424 super(admin, procId); 425 this.isAbortInProgress = abortProcResponse; 426 } 427 428 @Override 429 public Boolean get(long timeout, TimeUnit unit) 430 throws InterruptedException, ExecutionException, TimeoutException { 431 if (!this.isAbortInProgress) { 432 return false; 433 } 434 super.get(timeout, unit); 435 return true; 436 } 437 } 438 439 /** @return Connection used by this object. */ 440 @Override 441 public Connection getConnection() { 442 return connection; 443 } 444 445 @Override 446 public boolean tableExists(final TableName tableName) throws IOException { 447 return executeCallable(new RpcRetryingCallable<Boolean>() { 448 @Override 449 protected Boolean rpcCall(int callTimeout) throws Exception { 450 return MetaTableAccessor.tableExists(connection, tableName); 451 } 452 }); 453 } 454 455 @Override 456 public HTableDescriptor[] listTables() throws IOException { 457 return listTables((Pattern)null, false); 458 } 459 460 @Override 461 public HTableDescriptor[] listTables(Pattern pattern) throws IOException { 462 return listTables(pattern, false); 463 } 464 465 @Override 466 public HTableDescriptor[] listTables(String regex) throws IOException { 467 return listTables(Pattern.compile(regex), false); 468 } 469 470 @Override 471 public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables) 472 throws IOException { 473 return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), 474 getRpcControllerFactory()) { 475 @Override 476 protected HTableDescriptor[] rpcCall() throws Exception { 477 GetTableDescriptorsRequest req = 478 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables); 479 return ProtobufUtil.toTableDescriptorList(master.getTableDescriptors(getRpcController(), 480 req)).stream().map(ImmutableHTableDescriptor::new).toArray(HTableDescriptor[]::new); 481 } 482 }); 483 } 484 485 @Override 486 public HTableDescriptor[] listTables(String regex, boolean includeSysTables) 487 throws IOException { 488 return listTables(Pattern.compile(regex), includeSysTables); 489 } 490 491 @Override 492 public TableName[] listTableNames() throws IOException { 493 return listTableNames((Pattern)null, false); 494 } 495 496 @Override 497 public TableName[] listTableNames(Pattern pattern) throws IOException { 498 return listTableNames(pattern, false); 499 } 500 501 @Override 502 public TableName[] listTableNames(String regex) throws IOException { 503 return listTableNames(Pattern.compile(regex), false); 504 } 505 506 @Override 507 public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables) 508 throws IOException { 509 return executeCallable(new MasterCallable<TableName[]>(getConnection(), 510 getRpcControllerFactory()) { 511 @Override 512 protected TableName[] rpcCall() throws Exception { 513 GetTableNamesRequest req = 514 RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables); 515 return ProtobufUtil.getTableNameArray(master.getTableNames(getRpcController(), req) 516 .getTableNamesList()); 517 } 518 }); 519 } 520 521 @Override 522 public TableName[] listTableNames(final String regex, final boolean includeSysTables) 523 throws IOException { 524 return listTableNames(Pattern.compile(regex), includeSysTables); 525 } 526 527 @Override 528 public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException { 529 return getHTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory, 530 operationTimeout, rpcTimeout); 531 } 532 533 static TableDescriptor getTableDescriptor(final TableName tableName, Connection connection, 534 RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, 535 int operationTimeout, int rpcTimeout) throws IOException { 536 if (tableName == null) return null; 537 TableDescriptor td = 538 executeCallable(new MasterCallable<TableDescriptor>(connection, rpcControllerFactory) { 539 @Override 540 protected TableDescriptor rpcCall() throws Exception { 541 GetTableDescriptorsRequest req = 542 RequestConverter.buildGetTableDescriptorsRequest(tableName); 543 GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req); 544 if (!htds.getTableSchemaList().isEmpty()) { 545 return ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0)); 546 } 547 return null; 548 } 549 }, rpcCallerFactory, operationTimeout, rpcTimeout); 550 if (td != null) { 551 return td; 552 } 553 throw new TableNotFoundException(tableName.getNameAsString()); 554 } 555 556 /** 557 * @deprecated since 2.0 version and will be removed in 3.0 version. 558 * use {@link #getTableDescriptor(TableName, 559 * Connection, RpcRetryingCallerFactory,RpcControllerFactory,int,int)} 560 */ 561 @Deprecated 562 static HTableDescriptor getHTableDescriptor(final TableName tableName, Connection connection, 563 RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, 564 int operationTimeout, int rpcTimeout) throws IOException { 565 if (tableName == null) { 566 return null; 567 } 568 HTableDescriptor htd = 569 executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) { 570 @Override 571 protected HTableDescriptor rpcCall() throws Exception { 572 GetTableDescriptorsRequest req = 573 RequestConverter.buildGetTableDescriptorsRequest(tableName); 574 GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req); 575 if (!htds.getTableSchemaList().isEmpty()) { 576 return new ImmutableHTableDescriptor( 577 ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0))); 578 } 579 return null; 580 } 581 }, rpcCallerFactory, operationTimeout, rpcTimeout); 582 if (htd != null) { 583 return new ImmutableHTableDescriptor(htd); 584 } 585 throw new TableNotFoundException(tableName.getNameAsString()); 586 } 587 588 private long getPauseTime(int tries) { 589 int triesCount = tries; 590 if (triesCount >= HConstants.RETRY_BACKOFF.length) { 591 triesCount = HConstants.RETRY_BACKOFF.length - 1; 592 } 593 return this.pause * HConstants.RETRY_BACKOFF[triesCount]; 594 } 595 596 @Override 597 public void createTable(TableDescriptor desc) 598 throws IOException { 599 createTable(desc, null); 600 } 601 602 @Override 603 public void createTable(TableDescriptor desc, byte [] startKey, 604 byte [] endKey, int numRegions) 605 throws IOException { 606 if(numRegions < 3) { 607 throw new IllegalArgumentException("Must create at least three regions"); 608 } else if(Bytes.compareTo(startKey, endKey) >= 0) { 609 throw new IllegalArgumentException("Start key must be smaller than end key"); 610 } 611 if (numRegions == 3) { 612 createTable(desc, new byte[][]{startKey, endKey}); 613 return; 614 } 615 byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 616 if(splitKeys == null || splitKeys.length != numRegions - 1) { 617 throw new IllegalArgumentException("Unable to split key range into enough regions"); 618 } 619 createTable(desc, splitKeys); 620 } 621 622 @Override 623 public void createTable(final TableDescriptor desc, byte [][] splitKeys) 624 throws IOException { 625 get(createTableAsync(desc, splitKeys), syncWaitTimeout, TimeUnit.MILLISECONDS); 626 } 627 628 @Override 629 public Future<Void> createTableAsync(final TableDescriptor desc, final byte[][] splitKeys) 630 throws IOException { 631 if (desc.getTableName() == null) { 632 throw new IllegalArgumentException("TableName cannot be null"); 633 } 634 if (splitKeys != null && splitKeys.length > 0) { 635 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 636 // Verify there are no duplicate split keys 637 byte[] lastKey = null; 638 for (byte[] splitKey : splitKeys) { 639 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 640 throw new IllegalArgumentException( 641 "Empty split key must not be passed in the split keys."); 642 } 643 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 644 throw new IllegalArgumentException("All split keys must be unique, " + 645 "found duplicate: " + Bytes.toStringBinary(splitKey) + 646 ", " + Bytes.toStringBinary(lastKey)); 647 } 648 lastKey = splitKey; 649 } 650 } 651 652 CreateTableResponse response = executeCallable( 653 new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) { 654 Long nonceGroup = ng.getNonceGroup(); 655 Long nonce = ng.newNonce(); 656 @Override 657 protected CreateTableResponse rpcCall() throws Exception { 658 setPriority(desc.getTableName()); 659 CreateTableRequest request = RequestConverter.buildCreateTableRequest( 660 desc, splitKeys, nonceGroup, nonce); 661 return master.createTable(getRpcController(), request); 662 } 663 }); 664 return new CreateTableFuture(this, desc, splitKeys, response); 665 } 666 667 private static class CreateTableFuture extends TableFuture<Void> { 668 private final TableDescriptor desc; 669 private final byte[][] splitKeys; 670 671 public CreateTableFuture(final HBaseAdmin admin, final TableDescriptor desc, 672 final byte[][] splitKeys, final CreateTableResponse response) { 673 super(admin, desc.getTableName(), 674 (response != null && response.hasProcId()) ? response.getProcId() : null); 675 this.splitKeys = splitKeys; 676 this.desc = desc; 677 } 678 679 @Override 680 protected TableDescriptor getTableDescriptor() { 681 return desc; 682 } 683 684 @Override 685 public String getOperationType() { 686 return "CREATE"; 687 } 688 689 @Override 690 protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException { 691 waitForTableEnabled(deadlineTs); 692 waitForAllRegionsOnline(deadlineTs, splitKeys); 693 return null; 694 } 695 } 696 697 @Override 698 public void deleteTable(final TableName tableName) throws IOException { 699 get(deleteTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS); 700 } 701 702 @Override 703 public Future<Void> deleteTableAsync(final TableName tableName) throws IOException { 704 DeleteTableResponse response = executeCallable( 705 new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) { 706 Long nonceGroup = ng.getNonceGroup(); 707 Long nonce = ng.newNonce(); 708 @Override 709 protected DeleteTableResponse rpcCall() throws Exception { 710 setPriority(tableName); 711 DeleteTableRequest req = 712 RequestConverter.buildDeleteTableRequest(tableName, nonceGroup,nonce); 713 return master.deleteTable(getRpcController(), req); 714 } 715 }); 716 return new DeleteTableFuture(this, tableName, response); 717 } 718 719 private static class DeleteTableFuture extends TableFuture<Void> { 720 public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName, 721 final DeleteTableResponse response) { 722 super(admin, tableName, 723 (response != null && response.hasProcId()) ? response.getProcId() : null); 724 } 725 726 @Override 727 public String getOperationType() { 728 return "DELETE"; 729 } 730 731 @Override 732 protected Void waitOperationResult(final long deadlineTs) 733 throws IOException, TimeoutException { 734 waitTableNotFound(deadlineTs); 735 return null; 736 } 737 738 @Override 739 protected Void postOperationResult(final Void result, final long deadlineTs) 740 throws IOException, TimeoutException { 741 // Delete cached information to prevent clients from using old locations 742 ((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName()); 743 return super.postOperationResult(result, deadlineTs); 744 } 745 } 746 747 @Override 748 public HTableDescriptor[] deleteTables(String regex) throws IOException { 749 return deleteTables(Pattern.compile(regex)); 750 } 751 752 /** 753 * Delete tables matching the passed in pattern and wait on completion. 754 * 755 * Warning: Use this method carefully, there is no prompting and the effect is 756 * immediate. Consider using {@link #listTables(java.util.regex.Pattern) } and 757 * {@link #deleteTable(TableName)} 758 * 759 * @param pattern The pattern to match table names against 760 * @return Table descriptors for tables that couldn't be deleted 761 * @throws IOException 762 */ 763 @Override 764 public HTableDescriptor[] deleteTables(Pattern pattern) throws IOException { 765 List<HTableDescriptor> failed = new LinkedList<>(); 766 for (HTableDescriptor table : listTables(pattern)) { 767 try { 768 deleteTable(table.getTableName()); 769 } catch (IOException ex) { 770 LOG.info("Failed to delete table " + table.getTableName(), ex); 771 failed.add(table); 772 } 773 } 774 return failed.toArray(new HTableDescriptor[failed.size()]); 775 } 776 777 @Override 778 public void truncateTable(final TableName tableName, final boolean preserveSplits) 779 throws IOException { 780 get(truncateTableAsync(tableName, preserveSplits), syncWaitTimeout, TimeUnit.MILLISECONDS); 781 } 782 783 @Override 784 public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits) 785 throws IOException { 786 TruncateTableResponse response = 787 executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(), 788 getRpcControllerFactory()) { 789 Long nonceGroup = ng.getNonceGroup(); 790 Long nonce = ng.newNonce(); 791 @Override 792 protected TruncateTableResponse rpcCall() throws Exception { 793 setPriority(tableName); 794 LOG.info("Started truncating " + tableName); 795 TruncateTableRequest req = RequestConverter.buildTruncateTableRequest( 796 tableName, preserveSplits, nonceGroup, nonce); 797 return master.truncateTable(getRpcController(), req); 798 } 799 }); 800 return new TruncateTableFuture(this, tableName, preserveSplits, response); 801 } 802 803 private static class TruncateTableFuture extends TableFuture<Void> { 804 private final boolean preserveSplits; 805 806 public TruncateTableFuture(final HBaseAdmin admin, final TableName tableName, 807 final boolean preserveSplits, final TruncateTableResponse response) { 808 super(admin, tableName, 809 (response != null && response.hasProcId()) ? response.getProcId() : null); 810 this.preserveSplits = preserveSplits; 811 } 812 813 @Override 814 public String getOperationType() { 815 return "TRUNCATE"; 816 } 817 818 @Override 819 protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException { 820 waitForTableEnabled(deadlineTs); 821 // once the table is enabled, we know the operation is done. so we can fetch the splitKeys 822 byte[][] splitKeys = preserveSplits ? getAdmin().getTableSplits(getTableName()) : null; 823 waitForAllRegionsOnline(deadlineTs, splitKeys); 824 return null; 825 } 826 } 827 828 private byte[][] getTableSplits(final TableName tableName) throws IOException { 829 byte[][] splits = null; 830 try (RegionLocator locator = getConnection().getRegionLocator(tableName)) { 831 byte[][] startKeys = locator.getStartKeys(); 832 if (startKeys.length == 1) { 833 return splits; 834 } 835 splits = new byte[startKeys.length - 1][]; 836 for (int i = 1; i < startKeys.length; i++) { 837 splits[i - 1] = startKeys[i]; 838 } 839 } 840 return splits; 841 } 842 843 @Override 844 public void enableTable(final TableName tableName) 845 throws IOException { 846 get(enableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS); 847 } 848 849 @Override 850 public Future<Void> enableTableAsync(final TableName tableName) throws IOException { 851 TableName.isLegalFullyQualifiedTableName(tableName.getName()); 852 EnableTableResponse response = executeCallable( 853 new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) { 854 Long nonceGroup = ng.getNonceGroup(); 855 Long nonce = ng.newNonce(); 856 @Override 857 protected EnableTableResponse rpcCall() throws Exception { 858 setPriority(tableName); 859 LOG.info("Started enable of " + tableName); 860 EnableTableRequest req = 861 RequestConverter.buildEnableTableRequest(tableName, nonceGroup, nonce); 862 return master.enableTable(getRpcController(),req); 863 } 864 }); 865 return new EnableTableFuture(this, tableName, response); 866 } 867 868 private static class EnableTableFuture extends TableFuture<Void> { 869 public EnableTableFuture(final HBaseAdmin admin, final TableName tableName, 870 final EnableTableResponse response) { 871 super(admin, tableName, 872 (response != null && response.hasProcId()) ? response.getProcId() : null); 873 } 874 875 @Override 876 public String getOperationType() { 877 return "ENABLE"; 878 } 879 880 @Override 881 protected Void waitOperationResult(final long deadlineTs) throws IOException, TimeoutException { 882 waitForTableEnabled(deadlineTs); 883 return null; 884 } 885 } 886 887 @Override 888 public HTableDescriptor[] enableTables(String regex) throws IOException { 889 return enableTables(Pattern.compile(regex)); 890 } 891 892 @Override 893 public HTableDescriptor[] enableTables(Pattern pattern) throws IOException { 894 List<HTableDescriptor> failed = new LinkedList<>(); 895 for (HTableDescriptor table : listTables(pattern)) { 896 if (isTableDisabled(table.getTableName())) { 897 try { 898 enableTable(table.getTableName()); 899 } catch (IOException ex) { 900 LOG.info("Failed to enable table " + table.getTableName(), ex); 901 failed.add(table); 902 } 903 } 904 } 905 return failed.toArray(new HTableDescriptor[failed.size()]); 906 } 907 908 @Override 909 public void disableTable(final TableName tableName) 910 throws IOException { 911 get(disableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS); 912 } 913 914 @Override 915 public Future<Void> disableTableAsync(final TableName tableName) throws IOException { 916 TableName.isLegalFullyQualifiedTableName(tableName.getName()); 917 DisableTableResponse response = executeCallable( 918 new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) { 919 Long nonceGroup = ng.getNonceGroup(); 920 Long nonce = ng.newNonce(); 921 @Override 922 protected DisableTableResponse rpcCall() throws Exception { 923 setPriority(tableName); 924 LOG.info("Started disable of " + tableName); 925 DisableTableRequest req = 926 RequestConverter.buildDisableTableRequest( 927 tableName, nonceGroup, nonce); 928 return master.disableTable(getRpcController(), req); 929 } 930 }); 931 return new DisableTableFuture(this, tableName, response); 932 } 933 934 private static class DisableTableFuture extends TableFuture<Void> { 935 public DisableTableFuture(final HBaseAdmin admin, final TableName tableName, 936 final DisableTableResponse response) { 937 super(admin, tableName, 938 (response != null && response.hasProcId()) ? response.getProcId() : null); 939 } 940 941 @Override 942 public String getOperationType() { 943 return "DISABLE"; 944 } 945 946 @Override 947 protected Void waitOperationResult(long deadlineTs) throws IOException, TimeoutException { 948 waitForTableDisabled(deadlineTs); 949 return null; 950 } 951 } 952 953 @Override 954 public HTableDescriptor[] disableTables(String regex) throws IOException { 955 return disableTables(Pattern.compile(regex)); 956 } 957 958 @Override 959 public HTableDescriptor[] disableTables(Pattern pattern) throws IOException { 960 List<HTableDescriptor> failed = new LinkedList<>(); 961 for (HTableDescriptor table : listTables(pattern)) { 962 if (isTableEnabled(table.getTableName())) { 963 try { 964 disableTable(table.getTableName()); 965 } catch (IOException ex) { 966 LOG.info("Failed to disable table " + table.getTableName(), ex); 967 failed.add(table); 968 } 969 } 970 } 971 return failed.toArray(new HTableDescriptor[failed.size()]); 972 } 973 974 @Override 975 public boolean isTableEnabled(final TableName tableName) throws IOException { 976 checkTableExists(tableName); 977 return executeCallable(new RpcRetryingCallable<Boolean>() { 978 @Override 979 protected Boolean rpcCall(int callTimeout) throws Exception { 980 TableState tableState = MetaTableAccessor.getTableState(getConnection(), tableName); 981 if (tableState == null) { 982 throw new TableNotFoundException(tableName); 983 } 984 return tableState.inStates(TableState.State.ENABLED); 985 } 986 }); 987 } 988 989 @Override 990 public boolean isTableDisabled(TableName tableName) throws IOException { 991 checkTableExists(tableName); 992 return connection.isTableDisabled(tableName); 993 } 994 995 @Override 996 public boolean isTableAvailable(TableName tableName) throws IOException { 997 return connection.isTableAvailable(tableName, null); 998 } 999 1000 @Override 1001 public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException { 1002 return connection.isTableAvailable(tableName, splitKeys); 1003 } 1004 1005 @Override 1006 public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException { 1007 return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(), 1008 getRpcControllerFactory()) { 1009 @Override 1010 protected Pair<Integer, Integer> rpcCall() throws Exception { 1011 setPriority(tableName); 1012 GetSchemaAlterStatusRequest req = RequestConverter 1013 .buildGetSchemaAlterStatusRequest(tableName); 1014 GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(getRpcController(), req); 1015 Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(), 1016 ret.getTotalRegions()); 1017 return pair; 1018 } 1019 }); 1020 } 1021 1022 @Override 1023 public Pair<Integer, Integer> getAlterStatus(final byte[] tableName) throws IOException { 1024 return getAlterStatus(TableName.valueOf(tableName)); 1025 } 1026 1027 @Override 1028 public void addColumnFamily(final TableName tableName, final ColumnFamilyDescriptor columnFamily) 1029 throws IOException { 1030 get(addColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS); 1031 } 1032 1033 @Override 1034 public Future<Void> addColumnFamilyAsync(final TableName tableName, 1035 final ColumnFamilyDescriptor columnFamily) throws IOException { 1036 AddColumnResponse response = 1037 executeCallable(new MasterCallable<AddColumnResponse>(getConnection(), 1038 getRpcControllerFactory()) { 1039 Long nonceGroup = ng.getNonceGroup(); 1040 Long nonce = ng.newNonce(); 1041 @Override 1042 protected AddColumnResponse rpcCall() throws Exception { 1043 setPriority(tableName); 1044 AddColumnRequest req = 1045 RequestConverter.buildAddColumnRequest(tableName, columnFamily, nonceGroup, nonce); 1046 return master.addColumn(getRpcController(), req); 1047 } 1048 }); 1049 return new AddColumnFamilyFuture(this, tableName, response); 1050 } 1051 1052 private static class AddColumnFamilyFuture extends ModifyTableFuture { 1053 public AddColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName, 1054 final AddColumnResponse response) { 1055 super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId() 1056 : null); 1057 } 1058 1059 @Override 1060 public String getOperationType() { 1061 return "ADD_COLUMN_FAMILY"; 1062 } 1063 } 1064 1065 /** 1066 * {@inheritDoc} 1067 * @deprecated Since 2.0. Will be removed in 3.0. Use 1068 * {@link #deleteColumnFamily(TableName, byte[])} instead. 1069 */ 1070 @Override 1071 @Deprecated 1072 public void deleteColumn(final TableName tableName, final byte[] columnFamily) 1073 throws IOException { 1074 deleteColumnFamily(tableName, columnFamily); 1075 } 1076 1077 @Override 1078 public void deleteColumnFamily(final TableName tableName, final byte[] columnFamily) 1079 throws IOException { 1080 get(deleteColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS); 1081 } 1082 1083 @Override 1084 public Future<Void> deleteColumnFamilyAsync(final TableName tableName, final byte[] columnFamily) 1085 throws IOException { 1086 DeleteColumnResponse response = 1087 executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(), 1088 getRpcControllerFactory()) { 1089 Long nonceGroup = ng.getNonceGroup(); 1090 Long nonce = ng.newNonce(); 1091 @Override 1092 protected DeleteColumnResponse rpcCall() throws Exception { 1093 setPriority(tableName); 1094 DeleteColumnRequest req = 1095 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, 1096 nonceGroup, nonce); 1097 return master.deleteColumn(getRpcController(), req); 1098 } 1099 }); 1100 return new DeleteColumnFamilyFuture(this, tableName, response); 1101 } 1102 1103 private static class DeleteColumnFamilyFuture extends ModifyTableFuture { 1104 public DeleteColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName, 1105 final DeleteColumnResponse response) { 1106 super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId() 1107 : null); 1108 } 1109 1110 @Override 1111 public String getOperationType() { 1112 return "DELETE_COLUMN_FAMILY"; 1113 } 1114 } 1115 1116 @Override 1117 public void modifyColumnFamily(final TableName tableName, 1118 final ColumnFamilyDescriptor columnFamily) throws IOException { 1119 get(modifyColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS); 1120 } 1121 1122 @Override 1123 public Future<Void> modifyColumnFamilyAsync(final TableName tableName, 1124 final ColumnFamilyDescriptor columnFamily) throws IOException { 1125 ModifyColumnResponse response = 1126 executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(), 1127 getRpcControllerFactory()) { 1128 Long nonceGroup = ng.getNonceGroup(); 1129 Long nonce = ng.newNonce(); 1130 @Override 1131 protected ModifyColumnResponse rpcCall() throws Exception { 1132 setPriority(tableName); 1133 ModifyColumnRequest req = 1134 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, 1135 nonceGroup, nonce); 1136 return master.modifyColumn(getRpcController(), req); 1137 } 1138 }); 1139 return new ModifyColumnFamilyFuture(this, tableName, response); 1140 } 1141 1142 private static class ModifyColumnFamilyFuture extends ModifyTableFuture { 1143 public ModifyColumnFamilyFuture(final HBaseAdmin admin, final TableName tableName, 1144 final ModifyColumnResponse response) { 1145 super(admin, tableName, (response != null && response.hasProcId()) ? response.getProcId() 1146 : null); 1147 } 1148 1149 @Override 1150 public String getOperationType() { 1151 return "MODIFY_COLUMN_FAMILY"; 1152 } 1153 } 1154 1155 @Deprecated 1156 @Override 1157 public void closeRegion(final String regionName, final String unused) throws IOException { 1158 unassign(Bytes.toBytes(regionName), true); 1159 } 1160 1161 @Deprecated 1162 @Override 1163 public void closeRegion(final byte [] regionName, final String unused) throws IOException { 1164 unassign(regionName, true); 1165 } 1166 1167 @Deprecated 1168 @Override 1169 public boolean closeRegionWithEncodedRegionName(final String encodedRegionName, 1170 final String unused) throws IOException { 1171 unassign(Bytes.toBytes(encodedRegionName), true); 1172 return true; 1173 } 1174 1175 @Deprecated 1176 @Override 1177 public void closeRegion(final ServerName unused, final HRegionInfo hri) throws IOException { 1178 unassign(hri.getRegionName(), true); 1179 } 1180 1181 /** 1182 * @param sn 1183 * @return List of {@link HRegionInfo}. 1184 * @throws IOException 1185 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 1186 * Use {@link #getRegions(ServerName)}. 1187 */ 1188 @Deprecated 1189 @Override 1190 public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException { 1191 return getRegions(sn).stream().map(ImmutableHRegionInfo::new).collect(Collectors.toList()); 1192 } 1193 1194 @Override 1195 public void flush(final TableName tableName) throws IOException { 1196 checkTableExists(tableName); 1197 if (isTableDisabled(tableName)) { 1198 LOG.info("Table is disabled: " + tableName.getNameAsString()); 1199 return; 1200 } 1201 execProcedure("flush-table-proc", tableName.getNameAsString(), new HashMap<>()); 1202 } 1203 1204 @Override 1205 public void flushRegion(final byte[] regionName) throws IOException { 1206 Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName); 1207 if (regionServerPair == null) { 1208 throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName)); 1209 } 1210 if (regionServerPair.getSecond() == null) { 1211 throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); 1212 } 1213 final RegionInfo regionInfo = regionServerPair.getFirst(); 1214 ServerName serverName = regionServerPair.getSecond(); 1215 flush(this.connection.getAdmin(serverName), regionInfo); 1216 } 1217 1218 private void flush(AdminService.BlockingInterface admin, final RegionInfo info) 1219 throws IOException { 1220 ProtobufUtil.call(() -> { 1221 // TODO: There is no timeout on this controller. Set one! 1222 HBaseRpcController controller = rpcControllerFactory.newController(); 1223 FlushRegionRequest request = 1224 RequestConverter.buildFlushRegionRequest(info.getRegionName()); 1225 admin.flushRegion(controller, request); 1226 return null; 1227 }); 1228 } 1229 1230 @Override 1231 public void flushRegionServer(ServerName serverName) throws IOException { 1232 for (RegionInfo region : getRegions(serverName)) { 1233 flush(this.connection.getAdmin(serverName), region); 1234 } 1235 } 1236 1237 /** 1238 * {@inheritDoc} 1239 */ 1240 @Override 1241 public void compact(final TableName tableName) 1242 throws IOException { 1243 compact(tableName, null, false, CompactType.NORMAL); 1244 } 1245 1246 @Override 1247 public void compactRegion(final byte[] regionName) 1248 throws IOException { 1249 compactRegion(regionName, null, false); 1250 } 1251 1252 /** 1253 * {@inheritDoc} 1254 */ 1255 @Override 1256 public void compact(final TableName tableName, final byte[] columnFamily) 1257 throws IOException { 1258 compact(tableName, columnFamily, false, CompactType.NORMAL); 1259 } 1260 1261 /** 1262 * {@inheritDoc} 1263 */ 1264 @Override 1265 public void compactRegion(final byte[] regionName, final byte[] columnFamily) 1266 throws IOException { 1267 compactRegion(regionName, columnFamily, false); 1268 } 1269 1270 @Override 1271 public void compactRegionServer(final ServerName serverName) throws IOException { 1272 for (RegionInfo region : getRegions(serverName)) { 1273 compact(this.connection.getAdmin(serverName), region, false, null); 1274 } 1275 } 1276 1277 @Override 1278 public void majorCompactRegionServer(final ServerName serverName) throws IOException { 1279 for (RegionInfo region : getRegions(serverName)) { 1280 compact(this.connection.getAdmin(serverName), region, true, null); 1281 } 1282 } 1283 1284 @Override 1285 public void majorCompact(final TableName tableName) 1286 throws IOException { 1287 compact(tableName, null, true, CompactType.NORMAL); 1288 } 1289 1290 @Override 1291 public void majorCompactRegion(final byte[] regionName) 1292 throws IOException { 1293 compactRegion(regionName, null, true); 1294 } 1295 1296 /** 1297 * {@inheritDoc} 1298 */ 1299 @Override 1300 public void majorCompact(final TableName tableName, final byte[] columnFamily) 1301 throws IOException { 1302 compact(tableName, columnFamily, true, CompactType.NORMAL); 1303 } 1304 1305 @Override 1306 public void majorCompactRegion(final byte[] regionName, final byte[] columnFamily) 1307 throws IOException { 1308 compactRegion(regionName, columnFamily, true); 1309 } 1310 1311 /** 1312 * Compact a table. 1313 * Asynchronous operation. 1314 * 1315 * @param tableName table or region to compact 1316 * @param columnFamily column family within a table or region 1317 * @param major True if we are to do a major compaction. 1318 * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} 1319 * @throws IOException if a remote or network exception occurs 1320 */ 1321 private void compact(final TableName tableName, final byte[] columnFamily,final boolean major, 1322 CompactType compactType) throws IOException { 1323 switch (compactType) { 1324 case MOB: 1325 compact(this.connection.getAdminForMaster(), RegionInfo.createMobRegionInfo(tableName), 1326 major, columnFamily); 1327 break; 1328 case NORMAL: 1329 checkTableExists(tableName); 1330 for (HRegionLocation loc :connection.locateRegions(tableName, false, false)) { 1331 ServerName sn = loc.getServerName(); 1332 if (sn == null) { 1333 continue; 1334 } 1335 try { 1336 compact(this.connection.getAdmin(sn), loc.getRegion(), major, columnFamily); 1337 } catch (NotServingRegionException e) { 1338 if (LOG.isDebugEnabled()) { 1339 LOG.debug("Trying to" + (major ? " major" : "") + " compact " + loc.getRegion() + 1340 ": " + StringUtils.stringifyException(e)); 1341 } 1342 } 1343 } 1344 break; 1345 default: 1346 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1347 } 1348 } 1349 1350 /** 1351 * Compact an individual region. 1352 * Asynchronous operation. 1353 * 1354 * @param regionName region to compact 1355 * @param columnFamily column family within a table or region 1356 * @param major True if we are to do a major compaction. 1357 * @throws IOException if a remote or network exception occurs 1358 * @throws InterruptedException 1359 */ 1360 private void compactRegion(final byte[] regionName, final byte[] columnFamily, 1361 final boolean major) throws IOException { 1362 Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName); 1363 if (regionServerPair == null) { 1364 throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); 1365 } 1366 if (regionServerPair.getSecond() == null) { 1367 throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); 1368 } 1369 compact(this.connection.getAdmin(regionServerPair.getSecond()), regionServerPair.getFirst(), 1370 major, columnFamily); 1371 } 1372 1373 private void compact(AdminService.BlockingInterface admin, RegionInfo hri, boolean major, 1374 byte[] family) throws IOException { 1375 Callable<Void> callable = new Callable<Void>() { 1376 @Override 1377 public Void call() throws Exception { 1378 // TODO: There is no timeout on this controller. Set one! 1379 HBaseRpcController controller = rpcControllerFactory.newController(); 1380 CompactRegionRequest request = 1381 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); 1382 admin.compactRegion(controller, request); 1383 return null; 1384 } 1385 }; 1386 ProtobufUtil.call(callable); 1387 } 1388 1389 @Override 1390 public void move(final byte[] encodedRegionName, final byte[] destServerName) throws IOException { 1391 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 1392 @Override 1393 protected Void rpcCall() throws Exception { 1394 setPriority(encodedRegionName); 1395 MoveRegionRequest request = 1396 RequestConverter.buildMoveRegionRequest(encodedRegionName, 1397 destServerName != null ? ServerName.valueOf(Bytes.toString(destServerName)) : null); 1398 master.moveRegion(getRpcController(), request); 1399 return null; 1400 } 1401 }); 1402 } 1403 1404 @Override 1405 public void assign(final byte [] regionName) throws MasterNotRunningException, 1406 ZooKeeperConnectionException, IOException { 1407 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 1408 @Override 1409 protected Void rpcCall() throws Exception { 1410 setPriority(regionName); 1411 AssignRegionRequest request = 1412 RequestConverter.buildAssignRegionRequest(getRegionName(regionName)); 1413 master.assignRegion(getRpcController(), request); 1414 return null; 1415 } 1416 }); 1417 } 1418 1419 @Override 1420 public void unassign(final byte [] regionName, final boolean force) throws IOException { 1421 final byte[] toBeUnassigned = getRegionName(regionName); 1422 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 1423 @Override 1424 protected Void rpcCall() throws Exception { 1425 setPriority(regionName); 1426 UnassignRegionRequest request = 1427 RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); 1428 master.unassignRegion(getRpcController(), request); 1429 return null; 1430 } 1431 }); 1432 } 1433 1434 @Override 1435 public void offline(final byte [] regionName) 1436 throws IOException { 1437 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 1438 @Override 1439 protected Void rpcCall() throws Exception { 1440 setPriority(regionName); 1441 master.offlineRegion(getRpcController(), 1442 RequestConverter.buildOfflineRegionRequest(regionName)); 1443 return null; 1444 } 1445 }); 1446 } 1447 1448 @Override 1449 public boolean balancerSwitch(final boolean on, final boolean synchronous) 1450 throws IOException { 1451 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1452 @Override 1453 protected Boolean rpcCall() throws Exception { 1454 SetBalancerRunningRequest req = 1455 RequestConverter.buildSetBalancerRunningRequest(on, synchronous); 1456 return master.setBalancerRunning(getRpcController(), req).getPrevBalanceValue(); 1457 } 1458 }); 1459 } 1460 1461 @Override 1462 public boolean balance() throws IOException { 1463 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1464 @Override 1465 protected Boolean rpcCall() throws Exception { 1466 return master.balance(getRpcController(), 1467 RequestConverter.buildBalanceRequest(false)).getBalancerRan(); 1468 } 1469 }); 1470 } 1471 1472 @Override 1473 public boolean balance(final boolean force) throws IOException { 1474 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1475 @Override 1476 protected Boolean rpcCall() throws Exception { 1477 return master.balance(getRpcController(), 1478 RequestConverter.buildBalanceRequest(force)).getBalancerRan(); 1479 } 1480 }); 1481 } 1482 1483 @Override 1484 public boolean isBalancerEnabled() throws IOException { 1485 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1486 @Override 1487 protected Boolean rpcCall() throws Exception { 1488 return master.isBalancerEnabled(getRpcController(), 1489 RequestConverter.buildIsBalancerEnabledRequest()).getEnabled(); 1490 } 1491 }); 1492 } 1493 1494 /** 1495 * {@inheritDoc} 1496 */ 1497 @Override 1498 public CacheEvictionStats clearBlockCache(final TableName tableName) throws IOException { 1499 checkTableExists(tableName); 1500 CacheEvictionStatsBuilder cacheEvictionStats = CacheEvictionStats.builder(); 1501 List<Pair<RegionInfo, ServerName>> pairs = 1502 MetaTableAccessor.getTableRegionsAndLocations(connection, tableName); 1503 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 1504 pairs.stream() 1505 .filter(pair -> !(pair.getFirst().isOffline())) 1506 .filter(pair -> pair.getSecond() != null) 1507 .collect(Collectors.groupingBy(pair -> pair.getSecond(), 1508 Collectors.mapping(pair -> pair.getFirst(), Collectors.toList()))); 1509 1510 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 1511 CacheEvictionStats stats = clearBlockCache(entry.getKey(), entry.getValue()); 1512 cacheEvictionStats = cacheEvictionStats.append(stats); 1513 if (stats.getExceptionCount() > 0) { 1514 for (Map.Entry<byte[], Throwable> exception : stats.getExceptions().entrySet()) { 1515 LOG.debug("Failed to clear block cache for " 1516 + Bytes.toStringBinary(exception.getKey()) 1517 + " on " + entry.getKey() + ": ", exception.getValue()); 1518 } 1519 } 1520 } 1521 return cacheEvictionStats.build(); 1522 } 1523 1524 private CacheEvictionStats clearBlockCache(final ServerName sn, final List<RegionInfo> hris) 1525 throws IOException { 1526 HBaseRpcController controller = rpcControllerFactory.newController(); 1527 AdminService.BlockingInterface admin = this.connection.getAdmin(sn); 1528 ClearRegionBlockCacheRequest request = 1529 RequestConverter.buildClearRegionBlockCacheRequest(hris); 1530 ClearRegionBlockCacheResponse response; 1531 try { 1532 response = admin.clearRegionBlockCache(controller, request); 1533 return ProtobufUtil.toCacheEvictionStats(response.getStats()); 1534 } catch (ServiceException se) { 1535 throw ProtobufUtil.getRemoteException(se); 1536 } 1537 } 1538 1539 /** 1540 * Invoke region normalizer. Can NOT run for various reasons. Check logs. 1541 * 1542 * @return True if region normalizer ran, false otherwise. 1543 */ 1544 @Override 1545 public boolean normalize() throws IOException { 1546 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1547 @Override 1548 protected Boolean rpcCall() throws Exception { 1549 return master.normalize(getRpcController(), 1550 RequestConverter.buildNormalizeRequest()).getNormalizerRan(); 1551 } 1552 }); 1553 } 1554 1555 @Override 1556 public boolean isNormalizerEnabled() throws IOException { 1557 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1558 @Override 1559 protected Boolean rpcCall() throws Exception { 1560 return master.isNormalizerEnabled(getRpcController(), 1561 RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled(); 1562 } 1563 }); 1564 } 1565 1566 @Override 1567 public boolean normalizerSwitch(final boolean on) throws IOException { 1568 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1569 @Override 1570 protected Boolean rpcCall() throws Exception { 1571 SetNormalizerRunningRequest req = 1572 RequestConverter.buildSetNormalizerRunningRequest(on); 1573 return master.setNormalizerRunning(getRpcController(), req).getPrevNormalizerValue(); 1574 } 1575 }); 1576 } 1577 1578 @Override 1579 public boolean catalogJanitorSwitch(final boolean enable) throws IOException { 1580 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1581 @Override 1582 protected Boolean rpcCall() throws Exception { 1583 return master.enableCatalogJanitor(getRpcController(), 1584 RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue(); 1585 } 1586 }); 1587 } 1588 1589 @Override 1590 public int runCatalogJanitor() throws IOException { 1591 return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) { 1592 @Override 1593 protected Integer rpcCall() throws Exception { 1594 return master.runCatalogScan(getRpcController(), 1595 RequestConverter.buildCatalogScanRequest()).getScanResult(); 1596 } 1597 }); 1598 } 1599 1600 @Override 1601 public boolean isCatalogJanitorEnabled() throws IOException { 1602 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1603 @Override 1604 protected Boolean rpcCall() throws Exception { 1605 return master.isCatalogJanitorEnabled(getRpcController(), 1606 RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue(); 1607 } 1608 }); 1609 } 1610 1611 @Override 1612 public boolean cleanerChoreSwitch(final boolean on) throws IOException { 1613 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1614 @Override public Boolean rpcCall() throws Exception { 1615 return master.setCleanerChoreRunning(getRpcController(), 1616 RequestConverter.buildSetCleanerChoreRunningRequest(on)).getPrevValue(); 1617 } 1618 }); 1619 } 1620 1621 @Override 1622 public boolean runCleanerChore() throws IOException { 1623 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1624 @Override public Boolean rpcCall() throws Exception { 1625 return master.runCleanerChore(getRpcController(), 1626 RequestConverter.buildRunCleanerChoreRequest()).getCleanerChoreRan(); 1627 } 1628 }); 1629 } 1630 1631 @Override 1632 public boolean isCleanerChoreEnabled() throws IOException { 1633 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 1634 @Override public Boolean rpcCall() throws Exception { 1635 return master.isCleanerChoreEnabled(getRpcController(), 1636 RequestConverter.buildIsCleanerChoreEnabledRequest()).getValue(); 1637 } 1638 }); 1639 } 1640 1641 /** 1642 * Merge two regions. Synchronous operation. 1643 * Note: It is not feasible to predict the length of merge. 1644 * Therefore, this is for internal testing only. 1645 * @param nameOfRegionA encoded or full name of region a 1646 * @param nameOfRegionB encoded or full name of region b 1647 * @param forcible true if do a compulsory merge, otherwise we will only merge 1648 * two adjacent regions 1649 * @throws IOException 1650 */ 1651 @VisibleForTesting 1652 public void mergeRegionsSync( 1653 final byte[] nameOfRegionA, 1654 final byte[] nameOfRegionB, 1655 final boolean forcible) throws IOException { 1656 get( 1657 mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible), 1658 syncWaitTimeout, 1659 TimeUnit.MILLISECONDS); 1660 } 1661 1662 /** 1663 * Merge two regions. Asynchronous operation. 1664 * @param nameOfRegionA encoded or full name of region a 1665 * @param nameOfRegionB encoded or full name of region b 1666 * @param forcible true if do a compulsory merge, otherwise we will only merge 1667 * two adjacent regions 1668 * @throws IOException 1669 * @deprecated Since 2.0. Will be removed in 3.0. Use 1670 * {@link #mergeRegionsAsync(byte[], byte[], boolean)} instead. 1671 */ 1672 @Deprecated 1673 @Override 1674 public void mergeRegions(final byte[] nameOfRegionA, 1675 final byte[] nameOfRegionB, final boolean forcible) 1676 throws IOException { 1677 mergeRegionsAsync(nameOfRegionA, nameOfRegionB, forcible); 1678 } 1679 1680 /** 1681 * Merge two regions. Asynchronous operation. 1682 * @param nameOfRegionA encoded or full name of region a 1683 * @param nameOfRegionB encoded or full name of region b 1684 * @param forcible true if do a compulsory merge, otherwise we will only merge 1685 * two adjacent regions 1686 * @throws IOException 1687 */ 1688 @Override 1689 public Future<Void> mergeRegionsAsync( 1690 final byte[] nameOfRegionA, 1691 final byte[] nameOfRegionB, 1692 final boolean forcible) throws IOException { 1693 byte[][] nameofRegionsToMerge = new byte[2][]; 1694 nameofRegionsToMerge[0] = nameOfRegionA; 1695 nameofRegionsToMerge[1] = nameOfRegionB; 1696 return mergeRegionsAsync(nameofRegionsToMerge, forcible); 1697 } 1698 1699 /** 1700 * Merge two regions. Asynchronous operation. 1701 * @param nameofRegionsToMerge encoded or full name of daughter regions 1702 * @param forcible true if do a compulsory merge, otherwise we will only merge 1703 * adjacent regions 1704 * @throws IOException 1705 */ 1706 @Override 1707 public Future<Void> mergeRegionsAsync( 1708 final byte[][] nameofRegionsToMerge, 1709 final boolean forcible) throws IOException { 1710 assert(nameofRegionsToMerge.length >= 2); 1711 byte[][] encodedNameofRegionsToMerge = new byte[nameofRegionsToMerge.length][]; 1712 for(int i = 0; i < nameofRegionsToMerge.length; i++) { 1713 encodedNameofRegionsToMerge[i] = HRegionInfo.isEncodedRegionName(nameofRegionsToMerge[i]) ? 1714 nameofRegionsToMerge[i] : 1715 Bytes.toBytes(HRegionInfo.encodeRegionName(nameofRegionsToMerge[i])); 1716 } 1717 1718 TableName tableName = null; 1719 Pair<RegionInfo, ServerName> pair; 1720 1721 for(int i = 0; i < nameofRegionsToMerge.length; i++) { 1722 pair = getRegion(nameofRegionsToMerge[i]); 1723 1724 if (pair != null) { 1725 if (pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { 1726 throw new IllegalArgumentException ("Can't invoke merge on non-default regions directly"); 1727 } 1728 if (tableName == null) { 1729 tableName = pair.getFirst().getTable(); 1730 } else if (!tableName.equals(pair.getFirst().getTable())) { 1731 throw new IllegalArgumentException ("Cannot merge regions from two different tables " + 1732 tableName + " and " + pair.getFirst().getTable()); 1733 } 1734 } else { 1735 throw new UnknownRegionException ( 1736 "Can't invoke merge on unknown region " 1737 + Bytes.toStringBinary(encodedNameofRegionsToMerge[i])); 1738 } 1739 } 1740 1741 MergeTableRegionsResponse response = 1742 executeCallable(new MasterCallable<MergeTableRegionsResponse>(getConnection(), 1743 getRpcControllerFactory()) { 1744 Long nonceGroup = ng.getNonceGroup(); 1745 Long nonce = ng.newNonce(); 1746 @Override 1747 protected MergeTableRegionsResponse rpcCall() throws Exception { 1748 MergeTableRegionsRequest request = RequestConverter 1749 .buildMergeTableRegionsRequest( 1750 encodedNameofRegionsToMerge, 1751 forcible, 1752 nonceGroup, 1753 nonce); 1754 return master.mergeTableRegions(getRpcController(), request); 1755 } 1756 }); 1757 return new MergeTableRegionsFuture(this, tableName, response); 1758 } 1759 1760 private static class MergeTableRegionsFuture extends TableFuture<Void> { 1761 public MergeTableRegionsFuture( 1762 final HBaseAdmin admin, 1763 final TableName tableName, 1764 final MergeTableRegionsResponse response) { 1765 super(admin, tableName, 1766 (response != null && response.hasProcId()) ? response.getProcId() : null); 1767 } 1768 1769 public MergeTableRegionsFuture( 1770 final HBaseAdmin admin, 1771 final TableName tableName, 1772 final Long procId) { 1773 super(admin, tableName, procId); 1774 } 1775 1776 @Override 1777 public String getOperationType() { 1778 return "MERGE_REGIONS"; 1779 } 1780 } 1781 /** 1782 * Split one region. Synchronous operation. 1783 * Note: It is not feasible to predict the length of split. 1784 * Therefore, this is for internal testing only. 1785 * @param regionName encoded or full name of region 1786 * @param splitPoint key where region splits 1787 * @throws IOException 1788 */ 1789 @VisibleForTesting 1790 public void splitRegionSync(byte[] regionName, byte[] splitPoint) throws IOException { 1791 splitRegionSync(regionName, splitPoint, syncWaitTimeout, TimeUnit.MILLISECONDS); 1792 } 1793 1794 1795 /** 1796 * Split one region. Synchronous operation. 1797 * @param regionName region to be split 1798 * @param splitPoint split point 1799 * @param timeout how long to wait on split 1800 * @param units time units 1801 * @throws IOException 1802 */ 1803 public void splitRegionSync(byte[] regionName, byte[] splitPoint, 1804 final long timeout, final TimeUnit units) throws IOException { 1805 get( 1806 splitRegionAsync(regionName, splitPoint), 1807 timeout, 1808 units); 1809 } 1810 1811 @Override 1812 public Future<Void> splitRegionAsync(byte[] regionName, byte[] splitPoint) 1813 throws IOException { 1814 byte[] encodedNameofRegionToSplit = HRegionInfo.isEncodedRegionName(regionName) ? 1815 regionName : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName)); 1816 Pair<RegionInfo, ServerName> pair = getRegion(regionName); 1817 if (pair != null) { 1818 if (pair.getFirst() != null && 1819 pair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { 1820 throw new IllegalArgumentException ("Can't invoke split on non-default regions directly"); 1821 } 1822 } else { 1823 throw new UnknownRegionException ( 1824 "Can't invoke merge on unknown region " 1825 + Bytes.toStringBinary(encodedNameofRegionToSplit)); 1826 } 1827 1828 return splitRegionAsync(pair.getFirst(), splitPoint); 1829 } 1830 1831 Future<Void> splitRegionAsync(RegionInfo hri, byte[] splitPoint) throws IOException { 1832 TableName tableName = hri.getTable(); 1833 if (hri.getStartKey() != null && splitPoint != null && 1834 Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) { 1835 throw new IOException("should not give a splitkey which equals to startkey!"); 1836 } 1837 1838 SplitTableRegionResponse response = executeCallable( 1839 new MasterCallable<SplitTableRegionResponse>(getConnection(), getRpcControllerFactory()) { 1840 Long nonceGroup = ng.getNonceGroup(); 1841 Long nonce = ng.newNonce(); 1842 @Override 1843 protected SplitTableRegionResponse rpcCall() throws Exception { 1844 setPriority(tableName); 1845 SplitTableRegionRequest request = RequestConverter 1846 .buildSplitTableRegionRequest(hri, splitPoint, nonceGroup, nonce); 1847 return master.splitRegion(getRpcController(), request); 1848 } 1849 }); 1850 return new SplitTableRegionFuture(this, tableName, response); 1851 } 1852 1853 private static class SplitTableRegionFuture extends TableFuture<Void> { 1854 public SplitTableRegionFuture(final HBaseAdmin admin, 1855 final TableName tableName, 1856 final SplitTableRegionResponse response) { 1857 super(admin, tableName, 1858 (response != null && response.hasProcId()) ? response.getProcId() : null); 1859 } 1860 1861 public SplitTableRegionFuture( 1862 final HBaseAdmin admin, 1863 final TableName tableName, 1864 final Long procId) { 1865 super(admin, tableName, procId); 1866 } 1867 1868 @Override 1869 public String getOperationType() { 1870 return "SPLIT_REGION"; 1871 } 1872 } 1873 1874 @Override 1875 public void split(final TableName tableName) throws IOException { 1876 split(tableName, null); 1877 } 1878 1879 @Override 1880 public void splitRegion(final byte[] regionName) throws IOException { 1881 splitRegion(regionName, null); 1882 } 1883 1884 @Override 1885 public void split(final TableName tableName, final byte[] splitPoint) throws IOException { 1886 checkTableExists(tableName); 1887 for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) { 1888 ServerName sn = loc.getServerName(); 1889 if (sn == null) { 1890 continue; 1891 } 1892 RegionInfo r = loc.getRegion(); 1893 // check for parents 1894 if (r.isSplitParent()) { 1895 continue; 1896 } 1897 // if a split point given, only split that particular region 1898 if (r.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID || 1899 (splitPoint != null && !r.containsRow(splitPoint))) { 1900 continue; 1901 } 1902 // call out to master to do split now 1903 splitRegionAsync(r, splitPoint); 1904 } 1905 } 1906 1907 @Override 1908 public void splitRegion(final byte[] regionName, final byte [] splitPoint) throws IOException { 1909 Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName); 1910 if (regionServerPair == null) { 1911 throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); 1912 } 1913 if (regionServerPair.getFirst() != null && 1914 regionServerPair.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { 1915 throw new IllegalArgumentException("Can't split replicas directly. " 1916 + "Replicas are auto-split when their primary is split."); 1917 } 1918 if (regionServerPair.getSecond() == null) { 1919 throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); 1920 } 1921 splitRegionAsync(regionServerPair.getFirst(), splitPoint); 1922 } 1923 1924 @Override 1925 public void modifyTable(final TableName tableName, final TableDescriptor td) 1926 throws IOException { 1927 get(modifyTableAsync(tableName, td), syncWaitTimeout, TimeUnit.MILLISECONDS); 1928 } 1929 1930 @Override 1931 public Future<Void> modifyTableAsync(final TableName tableName, final TableDescriptor td) 1932 throws IOException { 1933 if (!tableName.equals(td.getTableName())) { 1934 throw new IllegalArgumentException("the specified table name '" + tableName + 1935 "' doesn't match with the HTD one: " + td.getTableName()); 1936 } 1937 return modifyTableAsync(td); 1938 } 1939 1940 private static class ModifyTableFuture extends TableFuture<Void> { 1941 public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName, 1942 final ModifyTableResponse response) { 1943 super(admin, tableName, 1944 (response != null && response.hasProcId()) ? response.getProcId() : null); 1945 } 1946 1947 public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) { 1948 super(admin, tableName, procId); 1949 } 1950 1951 @Override 1952 public String getOperationType() { 1953 return "MODIFY"; 1954 } 1955 1956 @Override 1957 protected Void postOperationResult(final Void result, final long deadlineTs) 1958 throws IOException, TimeoutException { 1959 // The modify operation on the table is asynchronous on the server side irrespective 1960 // of whether Procedure V2 is supported or not. So, we wait in the client till 1961 // all regions get updated. 1962 waitForSchemaUpdate(deadlineTs); 1963 return result; 1964 } 1965 } 1966 1967 /** 1968 * @param regionName Name of a region. 1969 * @return a pair of HRegionInfo and ServerName if <code>regionName</code> is 1970 * a verified region name (we call {@link 1971 * MetaTableAccessor#getRegionLocation(Connection, byte[])} 1972 * else null. 1973 * Throw IllegalArgumentException if <code>regionName</code> is null. 1974 * @throws IOException 1975 */ 1976 Pair<RegionInfo, ServerName> getRegion(final byte[] regionName) throws IOException { 1977 if (regionName == null) { 1978 throw new IllegalArgumentException("Pass a table name or region name"); 1979 } 1980 Pair<RegionInfo, ServerName> pair = MetaTableAccessor.getRegion(connection, regionName); 1981 if (pair == null) { 1982 final AtomicReference<Pair<RegionInfo, ServerName>> result = new AtomicReference<>(null); 1983 final String encodedName = Bytes.toString(regionName); 1984 MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { 1985 @Override 1986 public boolean visit(Result data) throws IOException { 1987 RegionInfo info = MetaTableAccessor.getRegionInfo(data); 1988 if (info == null) { 1989 LOG.warn("No serialized HRegionInfo in " + data); 1990 return true; 1991 } 1992 RegionLocations rl = MetaTableAccessor.getRegionLocations(data); 1993 boolean matched = false; 1994 ServerName sn = null; 1995 if (rl != null) { 1996 for (HRegionLocation h : rl.getRegionLocations()) { 1997 if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) { 1998 sn = h.getServerName(); 1999 info = h.getRegionInfo(); 2000 matched = true; 2001 } 2002 } 2003 } 2004 if (!matched) return true; 2005 result.set(new Pair<>(info, sn)); 2006 return false; // found the region, stop 2007 } 2008 }; 2009 2010 MetaTableAccessor.fullScanRegions(connection, visitor); 2011 pair = result.get(); 2012 } 2013 return pair; 2014 } 2015 2016 /** 2017 * If the input is a region name, it is returned as is. If it's an 2018 * encoded region name, the corresponding region is found from meta 2019 * and its region name is returned. If we can't find any region in 2020 * meta matching the input as either region name or encoded region 2021 * name, the input is returned as is. We don't throw unknown 2022 * region exception. 2023 */ 2024 private byte[] getRegionName( 2025 final byte[] regionNameOrEncodedRegionName) throws IOException { 2026 if (Bytes.equals(regionNameOrEncodedRegionName, 2027 HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) 2028 || Bytes.equals(regionNameOrEncodedRegionName, 2029 HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { 2030 return HRegionInfo.FIRST_META_REGIONINFO.getRegionName(); 2031 } 2032 byte[] tmp = regionNameOrEncodedRegionName; 2033 Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionNameOrEncodedRegionName); 2034 if (regionServerPair != null && regionServerPair.getFirst() != null) { 2035 tmp = regionServerPair.getFirst().getRegionName(); 2036 } 2037 return tmp; 2038 } 2039 2040 /** 2041 * Check if table exists or not 2042 * @param tableName Name of a table. 2043 * @return tableName instance 2044 * @throws IOException if a remote or network exception occurs. 2045 * @throws TableNotFoundException if table does not exist. 2046 */ 2047 private TableName checkTableExists(final TableName tableName) 2048 throws IOException { 2049 return executeCallable(new RpcRetryingCallable<TableName>() { 2050 @Override 2051 protected TableName rpcCall(int callTimeout) throws Exception { 2052 if (!MetaTableAccessor.tableExists(connection, tableName)) { 2053 throw new TableNotFoundException(tableName); 2054 } 2055 return tableName; 2056 } 2057 }); 2058 } 2059 2060 @Override 2061 public synchronized void shutdown() throws IOException { 2062 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 2063 @Override 2064 protected Void rpcCall() throws Exception { 2065 setPriority(HConstants.HIGH_QOS); 2066 master.shutdown(getRpcController(), ShutdownRequest.newBuilder().build()); 2067 return null; 2068 } 2069 }); 2070 } 2071 2072 @Override 2073 public synchronized void stopMaster() throws IOException { 2074 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 2075 @Override 2076 protected Void rpcCall() throws Exception { 2077 setPriority(HConstants.HIGH_QOS); 2078 master.stopMaster(getRpcController(), StopMasterRequest.newBuilder().build()); 2079 return null; 2080 } 2081 }); 2082 } 2083 2084 @Override 2085 public synchronized void stopRegionServer(final String hostnamePort) 2086 throws IOException { 2087 String hostname = Addressing.parseHostname(hostnamePort); 2088 int port = Addressing.parsePort(hostnamePort); 2089 final AdminService.BlockingInterface admin = 2090 this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); 2091 // TODO: There is no timeout on this controller. Set one! 2092 HBaseRpcController controller = rpcControllerFactory.newController(); 2093 controller.setPriority(HConstants.HIGH_QOS); 2094 StopServerRequest request = RequestConverter.buildStopServerRequest( 2095 "Called by admin client " + this.connection.toString()); 2096 try { 2097 admin.stopServer(controller, request); 2098 } catch (Exception e) { 2099 throw ProtobufUtil.handleRemoteException(e); 2100 } 2101 } 2102 2103 @Override 2104 public boolean isMasterInMaintenanceMode() throws IOException { 2105 return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(), 2106 this.rpcControllerFactory) { 2107 @Override 2108 protected IsInMaintenanceModeResponse rpcCall() throws Exception { 2109 return master.isMasterInMaintenanceMode(getRpcController(), 2110 IsInMaintenanceModeRequest.newBuilder().build()); 2111 } 2112 }).getInMaintenanceMode(); 2113 } 2114 2115 @Override 2116 public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException { 2117 return executeCallable(new MasterCallable<ClusterMetrics>(getConnection(), 2118 this.rpcControllerFactory) { 2119 @Override 2120 protected ClusterMetrics rpcCall() throws Exception { 2121 GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(options); 2122 return ClusterMetricsBuilder.toClusterMetrics( 2123 master.getClusterStatus(getRpcController(), req).getClusterStatus()); 2124 } 2125 }); 2126 } 2127 2128 @Override 2129 public List<RegionMetrics> getRegionMetrics(ServerName serverName, TableName tableName) 2130 throws IOException { 2131 AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); 2132 HBaseRpcController controller = rpcControllerFactory.newController(); 2133 AdminProtos.GetRegionLoadRequest request = 2134 RequestConverter.buildGetRegionLoadRequest(tableName); 2135 try { 2136 return admin.getRegionLoad(controller, request).getRegionLoadsList().stream() 2137 .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()); 2138 } catch (ServiceException se) { 2139 throw ProtobufUtil.getRemoteException(se); 2140 } 2141 } 2142 2143 @Override 2144 public Configuration getConfiguration() { 2145 return this.conf; 2146 } 2147 2148 /** 2149 * Do a get with a timeout against the passed in <code>future</code>. 2150 */ 2151 private static <T> T get(final Future<T> future, final long timeout, final TimeUnit units) 2152 throws IOException { 2153 try { 2154 // TODO: how long should we wait? Spin forever? 2155 return future.get(timeout, units); 2156 } catch (InterruptedException e) { 2157 throw new InterruptedIOException("Interrupt while waiting on " + future); 2158 } catch (TimeoutException e) { 2159 throw new TimeoutIOException(e); 2160 } catch (ExecutionException e) { 2161 if (e.getCause() instanceof IOException) { 2162 throw (IOException)e.getCause(); 2163 } else { 2164 throw new IOException(e.getCause()); 2165 } 2166 } 2167 } 2168 2169 @Override 2170 public void createNamespace(final NamespaceDescriptor descriptor) 2171 throws IOException { 2172 get(createNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS); 2173 } 2174 2175 @Override 2176 public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor) 2177 throws IOException { 2178 CreateNamespaceResponse response = 2179 executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(), 2180 getRpcControllerFactory()) { 2181 @Override 2182 protected CreateNamespaceResponse rpcCall() throws Exception { 2183 return master.createNamespace(getRpcController(), 2184 CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil. 2185 toProtoNamespaceDescriptor(descriptor)).build()); 2186 } 2187 }); 2188 return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { 2189 @Override 2190 public String getOperationType() { 2191 return "CREATE_NAMESPACE"; 2192 } 2193 }; 2194 } 2195 2196 @Override 2197 public void modifyNamespace(final NamespaceDescriptor descriptor) 2198 throws IOException { 2199 get(modifyNamespaceAsync(descriptor), this.syncWaitTimeout, TimeUnit.MILLISECONDS); 2200 } 2201 2202 @Override 2203 public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor) 2204 throws IOException { 2205 ModifyNamespaceResponse response = 2206 executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(), 2207 getRpcControllerFactory()) { 2208 @Override 2209 protected ModifyNamespaceResponse rpcCall() throws Exception { 2210 // TODO: set priority based on NS? 2211 return master.modifyNamespace(getRpcController(), ModifyNamespaceRequest.newBuilder(). 2212 setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); 2213 } 2214 }); 2215 return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { 2216 @Override 2217 public String getOperationType() { 2218 return "MODIFY_NAMESPACE"; 2219 } 2220 }; 2221 } 2222 2223 @Override 2224 public void deleteNamespace(final String name) 2225 throws IOException { 2226 get(deleteNamespaceAsync(name), this.syncWaitTimeout, TimeUnit.MILLISECONDS); 2227 } 2228 2229 @Override 2230 public Future<Void> deleteNamespaceAsync(final String name) 2231 throws IOException { 2232 DeleteNamespaceResponse response = 2233 executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(), 2234 getRpcControllerFactory()) { 2235 @Override 2236 protected DeleteNamespaceResponse rpcCall() throws Exception { 2237 // TODO: set priority based on NS? 2238 return master.deleteNamespace(getRpcController(), DeleteNamespaceRequest.newBuilder(). 2239 setNamespaceName(name).build()); 2240 } 2241 }); 2242 return new NamespaceFuture(this, name, response.getProcId()) { 2243 @Override 2244 public String getOperationType() { 2245 return "DELETE_NAMESPACE"; 2246 } 2247 }; 2248 } 2249 2250 @Override 2251 public NamespaceDescriptor getNamespaceDescriptor(final String name) 2252 throws NamespaceNotFoundException, IOException { 2253 return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(), 2254 getRpcControllerFactory()) { 2255 @Override 2256 protected NamespaceDescriptor rpcCall() throws Exception { 2257 return ProtobufUtil.toNamespaceDescriptor( 2258 master.getNamespaceDescriptor(getRpcController(), 2259 GetNamespaceDescriptorRequest.newBuilder(). 2260 setNamespaceName(name).build()).getNamespaceDescriptor()); 2261 } 2262 }); 2263 } 2264 2265 @Override 2266 public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException { 2267 return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(), 2268 getRpcControllerFactory()) { 2269 @Override 2270 protected NamespaceDescriptor[] rpcCall() throws Exception { 2271 List<HBaseProtos.NamespaceDescriptor> list = 2272 master.listNamespaceDescriptors(getRpcController(), 2273 ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList(); 2274 NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; 2275 for(int i = 0; i < list.size(); i++) { 2276 res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); 2277 } 2278 return res; 2279 } 2280 }); 2281 } 2282 2283 @Override 2284 public String getProcedures() throws IOException { 2285 return executeCallable(new MasterCallable<String>(getConnection(), 2286 getRpcControllerFactory()) { 2287 @Override 2288 protected String rpcCall() throws Exception { 2289 GetProceduresRequest request = GetProceduresRequest.newBuilder().build(); 2290 GetProceduresResponse response = master.getProcedures(getRpcController(), request); 2291 return ProtobufUtil.toProcedureJson(response.getProcedureList()); 2292 } 2293 }); 2294 } 2295 2296 @Override 2297 public String getLocks() throws IOException { 2298 return executeCallable(new MasterCallable<String>(getConnection(), 2299 getRpcControllerFactory()) { 2300 @Override 2301 protected String rpcCall() throws Exception { 2302 GetLocksRequest request = GetLocksRequest.newBuilder().build(); 2303 GetLocksResponse response = master.getLocks(getRpcController(), request); 2304 return ProtobufUtil.toLockJson(response.getLockList()); 2305 } 2306 }); 2307 } 2308 2309 @Override 2310 public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException { 2311 return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), 2312 getRpcControllerFactory()) { 2313 @Override 2314 protected HTableDescriptor[] rpcCall() throws Exception { 2315 List<TableSchema> list = 2316 master.listTableDescriptorsByNamespace(getRpcController(), 2317 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name) 2318 .build()).getTableSchemaList(); 2319 HTableDescriptor[] res = new HTableDescriptor[list.size()]; 2320 for(int i=0; i < list.size(); i++) { 2321 res[i] = new ImmutableHTableDescriptor(ProtobufUtil.toTableDescriptor(list.get(i))); 2322 } 2323 return res; 2324 } 2325 }); 2326 } 2327 2328 @Override 2329 public TableName[] listTableNamesByNamespace(final String name) throws IOException { 2330 return executeCallable(new MasterCallable<TableName[]>(getConnection(), 2331 getRpcControllerFactory()) { 2332 @Override 2333 protected TableName[] rpcCall() throws Exception { 2334 List<HBaseProtos.TableName> tableNames = 2335 master.listTableNamesByNamespace(getRpcController(), ListTableNamesByNamespaceRequest. 2336 newBuilder().setNamespaceName(name).build()) 2337 .getTableNameList(); 2338 TableName[] result = new TableName[tableNames.size()]; 2339 for (int i = 0; i < tableNames.size(); i++) { 2340 result[i] = ProtobufUtil.toTableName(tableNames.get(i)); 2341 } 2342 return result; 2343 } 2344 }); 2345 } 2346 2347 /** 2348 * Is HBase available? Throw an exception if not. 2349 * @param conf system configuration 2350 * @throws MasterNotRunningException if the master is not running. 2351 * @throws ZooKeeperConnectionException if unable to connect to zookeeper. // TODO do not expose 2352 * ZKConnectionException. 2353 */ 2354 public static void available(final Configuration conf) 2355 throws MasterNotRunningException, ZooKeeperConnectionException, IOException { 2356 Configuration copyOfConf = HBaseConfiguration.create(conf); 2357 // We set it to make it fail as soon as possible if HBase is not available 2358 copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 2359 copyOfConf.setInt("zookeeper.recovery.retry", 0); 2360 2361 // Check ZK first. 2362 // If the connection exists, we may have a connection to ZK that does not work anymore 2363 try (ClusterConnection connection = 2364 (ClusterConnection) ConnectionFactory.createConnection(copyOfConf)) { 2365 // can throw MasterNotRunningException 2366 connection.isMasterRunning(); 2367 } 2368 } 2369 2370 /** 2371 * 2372 * @param tableName 2373 * @return List of {@link HRegionInfo}. 2374 * @throws IOException 2375 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 2376 * Use {@link #getRegions(TableName)}. 2377 */ 2378 @Deprecated 2379 @Override 2380 public List<HRegionInfo> getTableRegions(final TableName tableName) 2381 throws IOException { 2382 return getRegions(tableName).stream() 2383 .map(ImmutableHRegionInfo::new) 2384 .collect(Collectors.toList()); 2385 } 2386 2387 @Override 2388 public synchronized void close() throws IOException { 2389 } 2390 2391 @Override 2392 public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames) 2393 throws IOException { 2394 return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), 2395 getRpcControllerFactory()) { 2396 @Override 2397 protected HTableDescriptor[] rpcCall() throws Exception { 2398 GetTableDescriptorsRequest req = 2399 RequestConverter.buildGetTableDescriptorsRequest(tableNames); 2400 return ProtobufUtil 2401 .toTableDescriptorList(master.getTableDescriptors(getRpcController(), req)).stream() 2402 .map(ImmutableHTableDescriptor::new).toArray(HTableDescriptor[]::new); 2403 } 2404 }); 2405 } 2406 2407 @Override 2408 public HTableDescriptor[] getTableDescriptors(List<String> names) 2409 throws IOException { 2410 List<TableName> tableNames = new ArrayList<>(names.size()); 2411 for(String name : names) { 2412 tableNames.add(TableName.valueOf(name)); 2413 } 2414 return getTableDescriptorsByTableName(tableNames); 2415 } 2416 2417 private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException, 2418 FailedLogCloseException { 2419 final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); 2420 RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); 2421 // TODO: There is no timeout on this controller. Set one! 2422 HBaseRpcController controller = rpcControllerFactory.newController(); 2423 try { 2424 return admin.rollWALWriter(controller, request); 2425 } catch (ServiceException e) { 2426 throw ProtobufUtil.handleRemoteException(e); 2427 } 2428 } 2429 2430 /** 2431 * Roll the log writer. I.e. when using a file system based write ahead log, 2432 * start writing log messages to a new file. 2433 * 2434 * Note that when talking to a version 1.0+ HBase deployment, the rolling is asynchronous. 2435 * This method will return as soon as the roll is requested and the return value will 2436 * always be null. Additionally, the named region server may schedule store flushes at the 2437 * request of the wal handling the roll request. 2438 * 2439 * When talking to a 0.98 or older HBase deployment, the rolling is synchronous and the 2440 * return value may be either null or a list of encoded region names. 2441 * 2442 * @param serverName 2443 * The servername of the regionserver. A server name is made of host, 2444 * port and startcode. This is mandatory. Here is an example: 2445 * <code> host187.example.com,60020,1289493121758</code> 2446 * @return a set of {@link HRegionInfo#getEncodedName()} that would allow the wal to 2447 * clean up some underlying files. null if there's nothing to flush. 2448 * @throws IOException if a remote or network exception occurs 2449 * @throws FailedLogCloseException 2450 * @deprecated use {@link #rollWALWriter(ServerName)} 2451 */ 2452 @Deprecated 2453 public synchronized byte[][] rollHLogWriter(String serverName) 2454 throws IOException, FailedLogCloseException { 2455 ServerName sn = ServerName.valueOf(serverName); 2456 final RollWALWriterResponse response = rollWALWriterImpl(sn); 2457 int regionCount = response.getRegionToFlushCount(); 2458 if (0 == regionCount) { 2459 return null; 2460 } 2461 byte[][] regionsToFlush = new byte[regionCount][]; 2462 for (int i = 0; i < regionCount; i++) { 2463 regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i)); 2464 } 2465 return regionsToFlush; 2466 } 2467 2468 @Override 2469 public synchronized void rollWALWriter(ServerName serverName) 2470 throws IOException, FailedLogCloseException { 2471 rollWALWriterImpl(serverName); 2472 } 2473 2474 @Override 2475 public CompactionState getCompactionState(final TableName tableName) 2476 throws IOException { 2477 return getCompactionState(tableName, CompactType.NORMAL); 2478 } 2479 2480 @Override 2481 public CompactionState getCompactionStateForRegion(final byte[] regionName) 2482 throws IOException { 2483 final Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName); 2484 if (regionServerPair == null) { 2485 throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); 2486 } 2487 if (regionServerPair.getSecond() == null) { 2488 throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); 2489 } 2490 ServerName sn = regionServerPair.getSecond(); 2491 final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); 2492 // TODO: There is no timeout on this controller. Set one! 2493 HBaseRpcController controller = rpcControllerFactory.newController(); 2494 GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( 2495 regionServerPair.getFirst().getRegionName(), true); 2496 GetRegionInfoResponse response; 2497 try { 2498 response = admin.getRegionInfo(controller, request); 2499 } catch (ServiceException e) { 2500 throw ProtobufUtil.handleRemoteException(e); 2501 } 2502 if (response.getCompactionState() != null) { 2503 return ProtobufUtil.createCompactionState(response.getCompactionState()); 2504 } 2505 return null; 2506 } 2507 2508 @Override 2509 public void snapshot(final String snapshotName, 2510 final TableName tableName) throws IOException, 2511 SnapshotCreationException, IllegalArgumentException { 2512 snapshot(snapshotName, tableName, SnapshotType.FLUSH); 2513 } 2514 2515 @Override 2516 public void snapshot(final byte[] snapshotName, final TableName tableName) 2517 throws IOException, SnapshotCreationException, IllegalArgumentException { 2518 snapshot(Bytes.toString(snapshotName), tableName, SnapshotType.FLUSH); 2519 } 2520 2521 @Override 2522 public void snapshot(final String snapshotName, final TableName tableName, 2523 SnapshotType type) 2524 throws IOException, SnapshotCreationException, IllegalArgumentException { 2525 snapshot(new SnapshotDescription(snapshotName, tableName, type)); 2526 } 2527 2528 @Override 2529 public void snapshot(SnapshotDescription snapshotDesc) 2530 throws IOException, SnapshotCreationException, IllegalArgumentException { 2531 // actually take the snapshot 2532 SnapshotProtos.SnapshotDescription snapshot = 2533 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 2534 SnapshotResponse response = asyncSnapshot(snapshot); 2535 final IsSnapshotDoneRequest request = 2536 IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build(); 2537 IsSnapshotDoneResponse done = null; 2538 long start = EnvironmentEdgeManager.currentTime(); 2539 long max = response.getExpectedTimeout(); 2540 long maxPauseTime = max / this.numRetries; 2541 int tries = 0; 2542 LOG.debug("Waiting a max of " + max + " ms for snapshot '" + 2543 ClientSnapshotDescriptionUtils.toString(snapshot) + "'' to complete. (max " + 2544 maxPauseTime + " ms per retry)"); 2545 while (tries == 0 2546 || ((EnvironmentEdgeManager.currentTime() - start) < max && !done.getDone())) { 2547 try { 2548 // sleep a backoff <= pauseTime amount 2549 long sleep = getPauseTime(tries++); 2550 sleep = sleep > maxPauseTime ? maxPauseTime : sleep; 2551 LOG.debug("(#" + tries + ") Sleeping: " + sleep + 2552 "ms while waiting for snapshot completion."); 2553 Thread.sleep(sleep); 2554 } catch (InterruptedException e) { 2555 throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e); 2556 } 2557 LOG.debug("Getting current status of snapshot from master..."); 2558 done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(), 2559 getRpcControllerFactory()) { 2560 @Override 2561 protected IsSnapshotDoneResponse rpcCall() throws Exception { 2562 return master.isSnapshotDone(getRpcController(), request); 2563 } 2564 }); 2565 } 2566 if (!done.getDone()) { 2567 throw new SnapshotCreationException("Snapshot '" + snapshot.getName() 2568 + "' wasn't completed in expectedTime:" + max + " ms", snapshotDesc); 2569 } 2570 } 2571 2572 @Override 2573 public void snapshotAsync(SnapshotDescription snapshotDesc) throws IOException, 2574 SnapshotCreationException { 2575 asyncSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc)); 2576 } 2577 2578 private SnapshotResponse asyncSnapshot(SnapshotProtos.SnapshotDescription snapshot) 2579 throws IOException { 2580 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2581 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 2582 .build(); 2583 // run the snapshot on the master 2584 return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(), 2585 getRpcControllerFactory()) { 2586 @Override 2587 protected SnapshotResponse rpcCall() throws Exception { 2588 return master.snapshot(getRpcController(), request); 2589 } 2590 }); 2591 } 2592 2593 @Override 2594 public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc) 2595 throws IOException, HBaseSnapshotException, UnknownSnapshotException { 2596 final SnapshotProtos.SnapshotDescription snapshot = 2597 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 2598 return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(), 2599 getRpcControllerFactory()) { 2600 @Override 2601 protected IsSnapshotDoneResponse rpcCall() throws Exception { 2602 return master.isSnapshotDone(getRpcController(), 2603 IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); 2604 } 2605 }).getDone(); 2606 } 2607 2608 @Override 2609 public void restoreSnapshot(final byte[] snapshotName) 2610 throws IOException, RestoreSnapshotException { 2611 restoreSnapshot(Bytes.toString(snapshotName)); 2612 } 2613 2614 @Override 2615 public void restoreSnapshot(final String snapshotName) 2616 throws IOException, RestoreSnapshotException { 2617 boolean takeFailSafeSnapshot = 2618 conf.getBoolean(HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2619 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2620 restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2621 } 2622 2623 @Override 2624 public void restoreSnapshot(final byte[] snapshotName, final boolean takeFailSafeSnapshot) 2625 throws IOException, RestoreSnapshotException { 2626 restoreSnapshot(Bytes.toString(snapshotName), takeFailSafeSnapshot); 2627 } 2628 2629 /* 2630 * Check whether the snapshot exists and contains disabled table 2631 * 2632 * @param snapshotName name of the snapshot to restore 2633 * @throws IOException if a remote or network exception occurs 2634 * @throws RestoreSnapshotException if no valid snapshot is found 2635 */ 2636 private TableName getTableNameBeforeRestoreSnapshot(final String snapshotName) 2637 throws IOException, RestoreSnapshotException { 2638 TableName tableName = null; 2639 for (SnapshotDescription snapshotInfo: listSnapshots()) { 2640 if (snapshotInfo.getName().equals(snapshotName)) { 2641 tableName = snapshotInfo.getTableName(); 2642 break; 2643 } 2644 } 2645 2646 if (tableName == null) { 2647 throw new RestoreSnapshotException( 2648 "Unable to find the table name for snapshot=" + snapshotName); 2649 } 2650 return tableName; 2651 } 2652 2653 @Override 2654 public void restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) 2655 throws IOException, RestoreSnapshotException { 2656 restoreSnapshot(snapshotName, takeFailSafeSnapshot, false); 2657 } 2658 2659 @Override 2660 public void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot, 2661 final boolean restoreAcl) throws IOException, RestoreSnapshotException { 2662 TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName); 2663 2664 // The table does not exists, switch to clone. 2665 if (!tableExists(tableName)) { 2666 cloneSnapshot(snapshotName, tableName, restoreAcl); 2667 return; 2668 } 2669 2670 // Check if the table is disabled 2671 if (!isTableDisabled(tableName)) { 2672 throw new TableNotDisabledException(tableName); 2673 } 2674 2675 // Take a snapshot of the current state 2676 String failSafeSnapshotSnapshotName = null; 2677 if (takeFailSafeSnapshot) { 2678 failSafeSnapshotSnapshotName = conf.get("hbase.snapshot.restore.failsafe.name", 2679 "hbase-failsafe-{snapshot.name}-{restore.timestamp}"); 2680 failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotName 2681 .replace("{snapshot.name}", snapshotName) 2682 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2683 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2684 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2685 snapshot(failSafeSnapshotSnapshotName, tableName); 2686 } 2687 2688 try { 2689 // Restore snapshot 2690 get( 2691 internalRestoreSnapshotAsync(snapshotName, tableName, restoreAcl), 2692 syncWaitTimeout, 2693 TimeUnit.MILLISECONDS); 2694 } catch (IOException e) { 2695 // Something went wrong during the restore... 2696 // if the pre-restore snapshot is available try to rollback 2697 if (takeFailSafeSnapshot) { 2698 try { 2699 get( 2700 internalRestoreSnapshotAsync(failSafeSnapshotSnapshotName, tableName, restoreAcl), 2701 syncWaitTimeout, 2702 TimeUnit.MILLISECONDS); 2703 String msg = "Restore snapshot=" + snapshotName + 2704 " failed. Rollback to snapshot=" + failSafeSnapshotSnapshotName + " succeeded."; 2705 LOG.error(msg, e); 2706 throw new RestoreSnapshotException(msg, e); 2707 } catch (IOException ex) { 2708 String msg = "Failed to restore and rollback to snapshot=" + failSafeSnapshotSnapshotName; 2709 LOG.error(msg, ex); 2710 throw new RestoreSnapshotException(msg, e); 2711 } 2712 } else { 2713 throw new RestoreSnapshotException("Failed to restore snapshot=" + snapshotName, e); 2714 } 2715 } 2716 2717 // If the restore is succeeded, delete the pre-restore snapshot 2718 if (takeFailSafeSnapshot) { 2719 try { 2720 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2721 deleteSnapshot(failSafeSnapshotSnapshotName); 2722 } catch (IOException e) { 2723 LOG.error("Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, e); 2724 } 2725 } 2726 } 2727 2728 @Override 2729 public Future<Void> restoreSnapshotAsync(final String snapshotName) 2730 throws IOException, RestoreSnapshotException { 2731 TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName); 2732 2733 // The table does not exists, switch to clone. 2734 if (!tableExists(tableName)) { 2735 return cloneSnapshotAsync(snapshotName, tableName); 2736 } 2737 2738 // Check if the table is disabled 2739 if (!isTableDisabled(tableName)) { 2740 throw new TableNotDisabledException(tableName); 2741 } 2742 2743 return internalRestoreSnapshotAsync(snapshotName, tableName, false); 2744 } 2745 2746 @Override 2747 public void cloneSnapshot(final byte[] snapshotName, final TableName tableName) 2748 throws IOException, TableExistsException, RestoreSnapshotException { 2749 cloneSnapshot(Bytes.toString(snapshotName), tableName); 2750 } 2751 2752 @Override 2753 public void cloneSnapshot(String snapshotName, TableName tableName, boolean restoreAcl) 2754 throws IOException, TableExistsException, RestoreSnapshotException { 2755 if (tableExists(tableName)) { 2756 throw new TableExistsException(tableName); 2757 } 2758 get( 2759 internalRestoreSnapshotAsync(snapshotName, tableName, restoreAcl), 2760 Integer.MAX_VALUE, 2761 TimeUnit.MILLISECONDS); 2762 } 2763 2764 @Override 2765 public void cloneSnapshot(final String snapshotName, final TableName tableName) 2766 throws IOException, TableExistsException, RestoreSnapshotException { 2767 cloneSnapshot(snapshotName, tableName, false); 2768 } 2769 2770 @Override 2771 public Future<Void> cloneSnapshotAsync(final String snapshotName, final TableName tableName) 2772 throws IOException, TableExistsException { 2773 if (tableExists(tableName)) { 2774 throw new TableExistsException(tableName); 2775 } 2776 return internalRestoreSnapshotAsync(snapshotName, tableName, false); 2777 } 2778 2779 @Override 2780 public byte[] execProcedureWithReturn(String signature, String instance, Map<String, 2781 String> props) throws IOException { 2782 ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props); 2783 final ExecProcedureRequest request = 2784 ExecProcedureRequest.newBuilder().setProcedure(desc).build(); 2785 // run the procedure on the master 2786 ExecProcedureResponse response = executeCallable( 2787 new MasterCallable<ExecProcedureResponse>(getConnection(), getRpcControllerFactory()) { 2788 @Override 2789 protected ExecProcedureResponse rpcCall() throws Exception { 2790 return master.execProcedureWithRet(getRpcController(), request); 2791 } 2792 }); 2793 2794 return response.hasReturnData() ? response.getReturnData().toByteArray() : null; 2795 } 2796 2797 @Override 2798 public void execProcedure(String signature, String instance, Map<String, String> props) 2799 throws IOException { 2800 ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props); 2801 final ExecProcedureRequest request = 2802 ExecProcedureRequest.newBuilder().setProcedure(desc).build(); 2803 // run the procedure on the master 2804 ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>( 2805 getConnection(), getRpcControllerFactory()) { 2806 @Override 2807 protected ExecProcedureResponse rpcCall() throws Exception { 2808 return master.execProcedure(getRpcController(), request); 2809 } 2810 }); 2811 2812 long start = EnvironmentEdgeManager.currentTime(); 2813 long max = response.getExpectedTimeout(); 2814 long maxPauseTime = max / this.numRetries; 2815 int tries = 0; 2816 LOG.debug("Waiting a max of " + max + " ms for procedure '" + 2817 signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)"); 2818 boolean done = false; 2819 while (tries == 0 2820 || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) { 2821 try { 2822 // sleep a backoff <= pauseTime amount 2823 long sleep = getPauseTime(tries++); 2824 sleep = sleep > maxPauseTime ? maxPauseTime : sleep; 2825 LOG.debug("(#" + tries + ") Sleeping: " + sleep + 2826 "ms while waiting for procedure completion."); 2827 Thread.sleep(sleep); 2828 } catch (InterruptedException e) { 2829 throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e); 2830 } 2831 LOG.debug("Getting current status of procedure from master..."); 2832 done = isProcedureFinished(signature, instance, props); 2833 } 2834 if (!done) { 2835 throw new IOException("Procedure '" + signature + " : " + instance 2836 + "' wasn't completed in expectedTime:" + max + " ms"); 2837 } 2838 } 2839 2840 @Override 2841 public boolean isProcedureFinished(String signature, String instance, Map<String, String> props) 2842 throws IOException { 2843 ProcedureDescription desc = ProtobufUtil.buildProcedureDescription(signature, instance, props); 2844 return executeCallable( 2845 new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) { 2846 @Override 2847 protected IsProcedureDoneResponse rpcCall() throws Exception { 2848 return master.isProcedureDone(getRpcController(), 2849 IsProcedureDoneRequest.newBuilder().setProcedure(desc).build()); 2850 } 2851 }).getDone(); 2852 } 2853 2854 /** 2855 * Execute Restore/Clone snapshot and wait for the server to complete (blocking). 2856 * To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to 2857 * create an HTable instance to this table before it is available. 2858 * @param snapshotName snapshot to restore 2859 * @param tableName table name to restore the snapshot on 2860 * @throws IOException if a remote or network exception occurs 2861 * @throws RestoreSnapshotException if snapshot failed to be restored 2862 * @throws IllegalArgumentException if the restore request is formatted incorrectly 2863 */ 2864 private Future<Void> internalRestoreSnapshotAsync(final String snapshotName, 2865 final TableName tableName, final boolean restoreAcl) 2866 throws IOException, RestoreSnapshotException { 2867 final SnapshotProtos.SnapshotDescription snapshot = 2868 SnapshotProtos.SnapshotDescription.newBuilder() 2869 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2870 2871 // actually restore the snapshot 2872 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2873 2874 RestoreSnapshotResponse response = executeCallable( 2875 new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) { 2876 Long nonceGroup = ng.getNonceGroup(); 2877 Long nonce = ng.newNonce(); 2878 @Override 2879 protected RestoreSnapshotResponse rpcCall() throws Exception { 2880 final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder() 2881 .setSnapshot(snapshot) 2882 .setNonceGroup(nonceGroup) 2883 .setNonce(nonce) 2884 .setRestoreACL(restoreAcl) 2885 .build(); 2886 return master.restoreSnapshot(getRpcController(), request); 2887 } 2888 }); 2889 2890 return new RestoreSnapshotFuture(this, snapshot, tableName, response); 2891 } 2892 2893 private static class RestoreSnapshotFuture extends TableFuture<Void> { 2894 public RestoreSnapshotFuture( 2895 final HBaseAdmin admin, 2896 final SnapshotProtos.SnapshotDescription snapshot, 2897 final TableName tableName, 2898 final RestoreSnapshotResponse response) { 2899 super(admin, tableName, 2900 (response != null && response.hasProcId()) ? response.getProcId() : null); 2901 2902 if (response != null && !response.hasProcId()) { 2903 throw new UnsupportedOperationException("Client could not call old version of Server"); 2904 } 2905 } 2906 2907 public RestoreSnapshotFuture( 2908 final HBaseAdmin admin, 2909 final TableName tableName, 2910 final Long procId) { 2911 super(admin, tableName, procId); 2912 } 2913 2914 @Override 2915 public String getOperationType() { 2916 return "MODIFY"; 2917 } 2918 } 2919 2920 @Override 2921 public List<SnapshotDescription> listSnapshots() throws IOException { 2922 return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(), 2923 getRpcControllerFactory()) { 2924 @Override 2925 protected List<SnapshotDescription> rpcCall() throws Exception { 2926 List<SnapshotProtos.SnapshotDescription> snapshotsList = master 2927 .getCompletedSnapshots(getRpcController(), 2928 GetCompletedSnapshotsRequest.newBuilder().build()) 2929 .getSnapshotsList(); 2930 List<SnapshotDescription> result = new ArrayList<>(snapshotsList.size()); 2931 for (SnapshotProtos.SnapshotDescription snapshot : snapshotsList) { 2932 result.add(ProtobufUtil.createSnapshotDesc(snapshot)); 2933 } 2934 return result; 2935 } 2936 }); 2937 } 2938 2939 @Override 2940 public List<SnapshotDescription> listSnapshots(String regex) throws IOException { 2941 return listSnapshots(Pattern.compile(regex)); 2942 } 2943 2944 @Override 2945 public List<SnapshotDescription> listSnapshots(Pattern pattern) throws IOException { 2946 List<SnapshotDescription> matched = new LinkedList<>(); 2947 List<SnapshotDescription> snapshots = listSnapshots(); 2948 for (SnapshotDescription snapshot : snapshots) { 2949 if (pattern.matcher(snapshot.getName()).matches()) { 2950 matched.add(snapshot); 2951 } 2952 } 2953 return matched; 2954 } 2955 2956 @Override 2957 public List<SnapshotDescription> listTableSnapshots(String tableNameRegex, 2958 String snapshotNameRegex) throws IOException { 2959 return listTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex)); 2960 } 2961 2962 @Override 2963 public List<SnapshotDescription> listTableSnapshots(Pattern tableNamePattern, 2964 Pattern snapshotNamePattern) throws IOException { 2965 TableName[] tableNames = listTableNames(tableNamePattern); 2966 2967 List<SnapshotDescription> tableSnapshots = new LinkedList<>(); 2968 List<SnapshotDescription> snapshots = listSnapshots(snapshotNamePattern); 2969 2970 List<TableName> listOfTableNames = Arrays.asList(tableNames); 2971 for (SnapshotDescription snapshot : snapshots) { 2972 if (listOfTableNames.contains(snapshot.getTableName())) { 2973 tableSnapshots.add(snapshot); 2974 } 2975 } 2976 return tableSnapshots; 2977 } 2978 2979 @Override 2980 public void deleteSnapshot(final byte[] snapshotName) throws IOException { 2981 deleteSnapshot(Bytes.toString(snapshotName)); 2982 } 2983 2984 @Override 2985 public void deleteSnapshot(final String snapshotName) throws IOException { 2986 // make sure the snapshot is possibly valid 2987 TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName)); 2988 // do the delete 2989 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 2990 @Override 2991 protected Void rpcCall() throws Exception { 2992 master.deleteSnapshot(getRpcController(), 2993 DeleteSnapshotRequest.newBuilder().setSnapshot( 2994 SnapshotProtos.SnapshotDescription.newBuilder().setName(snapshotName).build()) 2995 .build() 2996 ); 2997 return null; 2998 } 2999 }); 3000 } 3001 3002 @Override 3003 public void deleteSnapshots(final String regex) throws IOException { 3004 deleteSnapshots(Pattern.compile(regex)); 3005 } 3006 3007 @Override 3008 public void deleteSnapshots(final Pattern pattern) throws IOException { 3009 List<SnapshotDescription> snapshots = listSnapshots(pattern); 3010 for (final SnapshotDescription snapshot : snapshots) { 3011 try { 3012 internalDeleteSnapshot(snapshot); 3013 } catch (IOException ex) { 3014 LOG.info("Failed to delete snapshot " + snapshot.getName() + " for table " 3015 + snapshot.getTableNameAsString(), ex); 3016 } 3017 } 3018 } 3019 3020 private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException { 3021 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 3022 @Override 3023 protected Void rpcCall() throws Exception { 3024 this.master.deleteSnapshot(getRpcController(), DeleteSnapshotRequest.newBuilder() 3025 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build()); 3026 return null; 3027 } 3028 }); 3029 } 3030 3031 @Override 3032 public void deleteTableSnapshots(String tableNameRegex, String snapshotNameRegex) 3033 throws IOException { 3034 deleteTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex)); 3035 } 3036 3037 @Override 3038 public void deleteTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) 3039 throws IOException { 3040 List<SnapshotDescription> snapshots = listTableSnapshots(tableNamePattern, snapshotNamePattern); 3041 for (SnapshotDescription snapshot : snapshots) { 3042 try { 3043 internalDeleteSnapshot(snapshot); 3044 LOG.debug("Successfully deleted snapshot: " + snapshot.getName()); 3045 } catch (IOException e) { 3046 LOG.error("Failed to delete snapshot: " + snapshot.getName(), e); 3047 } 3048 } 3049 } 3050 3051 @Override 3052 public void setQuota(final QuotaSettings quota) throws IOException { 3053 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 3054 @Override 3055 protected Void rpcCall() throws Exception { 3056 this.master.setQuota(getRpcController(), QuotaSettings.buildSetQuotaRequestProto(quota)); 3057 return null; 3058 } 3059 }); 3060 } 3061 3062 @Override 3063 public QuotaRetriever getQuotaRetriever(final QuotaFilter filter) throws IOException { 3064 return QuotaRetriever.open(conf, filter); 3065 } 3066 3067 @Override 3068 public List<QuotaSettings> getQuota(QuotaFilter filter) throws IOException { 3069 List<QuotaSettings> quotas = new LinkedList<>(); 3070 try (QuotaRetriever retriever = QuotaRetriever.open(conf, filter)) { 3071 Iterator<QuotaSettings> iterator = retriever.iterator(); 3072 while (iterator.hasNext()) { 3073 quotas.add(iterator.next()); 3074 } 3075 } 3076 return quotas; 3077 } 3078 3079 private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable) 3080 throws IOException { 3081 return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout); 3082 } 3083 3084 static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable, 3085 RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout) 3086 throws IOException { 3087 RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout); 3088 try { 3089 return caller.callWithRetries(callable, operationTimeout); 3090 } finally { 3091 callable.close(); 3092 } 3093 } 3094 3095 @Override 3096 // Coprocessor Endpoint against the Master. 3097 public CoprocessorRpcChannel coprocessorService() { 3098 return new SyncCoprocessorRpcChannel() { 3099 @Override 3100 protected Message callExecService(final RpcController controller, 3101 final Descriptors.MethodDescriptor method, final Message request, 3102 final Message responsePrototype) 3103 throws IOException { 3104 if (LOG.isTraceEnabled()) { 3105 LOG.trace("Call: " + method.getName() + ", " + request.toString()); 3106 } 3107 // Try-with-resources so close gets called when we are done. 3108 try (MasterCallable<CoprocessorServiceResponse> callable = 3109 new MasterCallable<CoprocessorServiceResponse>(connection, 3110 connection.getRpcControllerFactory()) { 3111 @Override 3112 protected CoprocessorServiceResponse rpcCall() throws Exception { 3113 CoprocessorServiceRequest csr = 3114 CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request); 3115 return this.master.execMasterService(getRpcController(), csr); 3116 } 3117 }) { 3118 // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller 3119 callable.prepare(false); 3120 int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout(); 3121 CoprocessorServiceResponse result = callable.call(operationTimeout); 3122 return CoprocessorRpcUtils.getResponse(result, responsePrototype); 3123 } 3124 } 3125 }; 3126 } 3127 3128 /** 3129 * Simple {@link Abortable}, throwing RuntimeException on abort. 3130 */ 3131 private static class ThrowableAbortable implements Abortable { 3132 @Override 3133 public void abort(String why, Throwable e) { 3134 throw new RuntimeException(why, e); 3135 } 3136 3137 @Override 3138 public boolean isAborted() { 3139 return true; 3140 } 3141 } 3142 3143 @Override 3144 public CoprocessorRpcChannel coprocessorService(final ServerName serverName) { 3145 return new SyncCoprocessorRpcChannel() { 3146 @Override 3147 protected Message callExecService(RpcController controller, 3148 Descriptors.MethodDescriptor method, Message request, Message responsePrototype) 3149 throws IOException { 3150 if (LOG.isTraceEnabled()) { 3151 LOG.trace("Call: " + method.getName() + ", " + request.toString()); 3152 } 3153 CoprocessorServiceRequest csr = 3154 CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request); 3155 // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller 3156 // TODO: Make this same as RegionCoprocessorRpcChannel and MasterCoprocessorRpcChannel. They 3157 // are all different though should do same thing; e.g. RpcChannel setup. 3158 ClientProtos.ClientService.BlockingInterface stub = connection.getClient(serverName); 3159 CoprocessorServiceResponse result; 3160 try { 3161 result = stub. 3162 execRegionServerService(connection.getRpcControllerFactory().newController(), csr); 3163 return CoprocessorRpcUtils.getResponse(result, responsePrototype); 3164 } catch (ServiceException e) { 3165 throw ProtobufUtil.handleRemoteException(e); 3166 } 3167 } 3168 }; 3169 } 3170 3171 @Override 3172 public void updateConfiguration(final ServerName server) throws IOException { 3173 final AdminService.BlockingInterface admin = this.connection.getAdmin(server); 3174 Callable<Void> callable = new Callable<Void>() { 3175 @Override 3176 public Void call() throws Exception { 3177 admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance()); 3178 return null; 3179 } 3180 }; 3181 ProtobufUtil.call(callable); 3182 } 3183 3184 @Override 3185 public void updateConfiguration() throws IOException { 3186 ClusterMetrics status = getClusterMetrics( 3187 EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)); 3188 for (ServerName server : status.getLiveServerMetrics().keySet()) { 3189 updateConfiguration(server); 3190 } 3191 3192 updateConfiguration(status.getMasterName()); 3193 3194 for (ServerName server : status.getBackupMasterNames()) { 3195 updateConfiguration(server); 3196 } 3197 } 3198 3199 @Override 3200 public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException { 3201 return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) { 3202 @Override 3203 protected Long rpcCall() throws Exception { 3204 MajorCompactionTimestampRequest req = 3205 MajorCompactionTimestampRequest.newBuilder() 3206 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3207 return master.getLastMajorCompactionTimestamp(getRpcController(), req). 3208 getCompactionTimestamp(); 3209 } 3210 }); 3211 } 3212 3213 @Override 3214 public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException { 3215 return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) { 3216 @Override 3217 protected Long rpcCall() throws Exception { 3218 MajorCompactionTimestampForRegionRequest req = 3219 MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter 3220 .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build(); 3221 return master.getLastMajorCompactionTimestampForRegion(getRpcController(), req) 3222 .getCompactionTimestamp(); 3223 } 3224 }); 3225 } 3226 3227 /** 3228 * {@inheritDoc} 3229 */ 3230 @Override 3231 public void compact(final TableName tableName, final byte[] columnFamily, CompactType compactType) 3232 throws IOException, InterruptedException { 3233 compact(tableName, columnFamily, false, compactType); 3234 } 3235 3236 /** 3237 * {@inheritDoc} 3238 */ 3239 @Override 3240 public void compact(final TableName tableName, CompactType compactType) 3241 throws IOException, InterruptedException { 3242 compact(tableName, null, false, compactType); 3243 } 3244 3245 /** 3246 * {@inheritDoc} 3247 */ 3248 @Override 3249 public void majorCompact(final TableName tableName, final byte[] columnFamily, 3250 CompactType compactType) throws IOException, InterruptedException { 3251 compact(tableName, columnFamily, true, compactType); 3252 } 3253 3254 /** 3255 * {@inheritDoc} 3256 */ 3257 @Override 3258 public void majorCompact(final TableName tableName, CompactType compactType) 3259 throws IOException, InterruptedException { 3260 compact(tableName, null, true, compactType); 3261 } 3262 3263 /** 3264 * {@inheritDoc} 3265 */ 3266 @Override 3267 public CompactionState getCompactionState(final TableName tableName, CompactType compactType) 3268 throws IOException { 3269 AdminProtos.GetRegionInfoResponse.CompactionState state = 3270 AdminProtos.GetRegionInfoResponse.CompactionState.NONE; 3271 checkTableExists(tableName); 3272 // TODO: There is no timeout on this controller. Set one! 3273 HBaseRpcController rpcController = rpcControllerFactory.newController(); 3274 switch (compactType) { 3275 case MOB: 3276 final AdminProtos.AdminService.BlockingInterface masterAdmin = 3277 this.connection.getAdminForMaster(); 3278 Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable = 3279 new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() { 3280 @Override 3281 public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception { 3282 RegionInfo info = RegionInfo.createMobRegionInfo(tableName); 3283 GetRegionInfoRequest request = 3284 RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true); 3285 GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request); 3286 return response.getCompactionState(); 3287 } 3288 }; 3289 state = ProtobufUtil.call(callable); 3290 break; 3291 case NORMAL: 3292 for (HRegionLocation loc : connection.locateRegions(tableName, false, false)) { 3293 ServerName sn = loc.getServerName(); 3294 if (sn == null) { 3295 continue; 3296 } 3297 byte[] regionName = loc.getRegion().getRegionName(); 3298 AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn); 3299 try { 3300 Callable<GetRegionInfoResponse> regionInfoCallable = 3301 new Callable<GetRegionInfoResponse>() { 3302 @Override 3303 public GetRegionInfoResponse call() throws Exception { 3304 GetRegionInfoRequest request = 3305 RequestConverter.buildGetRegionInfoRequest(regionName, true); 3306 return snAdmin.getRegionInfo(rpcController, request); 3307 } 3308 }; 3309 GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable); 3310 switch (response.getCompactionState()) { 3311 case MAJOR_AND_MINOR: 3312 return CompactionState.MAJOR_AND_MINOR; 3313 case MAJOR: 3314 if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MINOR) { 3315 return CompactionState.MAJOR_AND_MINOR; 3316 } 3317 state = AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR; 3318 break; 3319 case MINOR: 3320 if (state == AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) { 3321 return CompactionState.MAJOR_AND_MINOR; 3322 } 3323 state = AdminProtos.GetRegionInfoResponse.CompactionState.MINOR; 3324 break; 3325 case NONE: 3326 default: // nothing, continue 3327 } 3328 } catch (NotServingRegionException e) { 3329 if (LOG.isDebugEnabled()) { 3330 LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " + 3331 StringUtils.stringifyException(e)); 3332 } 3333 } catch (RemoteException e) { 3334 if (e.getMessage().indexOf(NotServingRegionException.class.getName()) >= 0) { 3335 if (LOG.isDebugEnabled()) { 3336 LOG.debug("Trying to get compaction state of " + loc.getRegion() + ": " + 3337 StringUtils.stringifyException(e)); 3338 } 3339 } else { 3340 throw e; 3341 } 3342 } 3343 } 3344 break; 3345 default: 3346 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3347 } 3348 if (state != null) { 3349 return ProtobufUtil.createCompactionState(state); 3350 } 3351 return null; 3352 } 3353 3354 /** 3355 * Future that waits on a procedure result. 3356 * Returned by the async version of the Admin calls, 3357 * and used internally by the sync calls to wait on the result of the procedure. 3358 */ 3359 @InterfaceAudience.Private 3360 @InterfaceStability.Evolving 3361 protected static class ProcedureFuture<V> implements Future<V> { 3362 private ExecutionException exception = null; 3363 private boolean procResultFound = false; 3364 private boolean done = false; 3365 private boolean cancelled = false; 3366 private V result = null; 3367 3368 private final HBaseAdmin admin; 3369 protected final Long procId; 3370 3371 public ProcedureFuture(final HBaseAdmin admin, final Long procId) { 3372 this.admin = admin; 3373 this.procId = procId; 3374 } 3375 3376 @Override 3377 public boolean cancel(boolean mayInterruptIfRunning) { 3378 AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder() 3379 .setProcId(procId).setMayInterruptIfRunning(mayInterruptIfRunning).build(); 3380 try { 3381 cancelled = abortProcedureResult(abortProcRequest).getIsProcedureAborted(); 3382 if (cancelled) { 3383 done = true; 3384 } 3385 } catch (IOException e) { 3386 // Cancell thrown exception for some reason. At this time, we are not sure whether 3387 // the cancell succeeds or fails. We assume that it is failed, but print out a warning 3388 // for debugging purpose. 3389 LOG.warn( 3390 "Cancelling the procedure with procId=" + procId + " throws exception " + e.getMessage(), 3391 e); 3392 cancelled = false; 3393 } 3394 return cancelled; 3395 } 3396 3397 @Override 3398 public boolean isCancelled() { 3399 return cancelled; 3400 } 3401 3402 protected AbortProcedureResponse abortProcedureResult( 3403 final AbortProcedureRequest request) throws IOException { 3404 return admin.executeCallable(new MasterCallable<AbortProcedureResponse>( 3405 admin.getConnection(), admin.getRpcControllerFactory()) { 3406 @Override 3407 protected AbortProcedureResponse rpcCall() throws Exception { 3408 return master.abortProcedure(getRpcController(), request); 3409 } 3410 }); 3411 } 3412 3413 @Override 3414 public V get() throws InterruptedException, ExecutionException { 3415 // TODO: should we ever spin forever? 3416 // fix HBASE-21715. TODO: If the function call get() without timeout limit is not allowed, 3417 // is it possible to compose instead of inheriting from the class Future for this class? 3418 try { 3419 return get(admin.getProcedureTimeout, TimeUnit.MILLISECONDS); 3420 } catch (TimeoutException e) { 3421 LOG.warn("Failed to get the procedure with procId=" + procId + " throws exception " + e 3422 .getMessage(), e); 3423 return null; 3424 } 3425 } 3426 3427 @Override 3428 public V get(long timeout, TimeUnit unit) 3429 throws InterruptedException, ExecutionException, TimeoutException { 3430 if (!done) { 3431 long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout); 3432 try { 3433 try { 3434 // if the master support procedures, try to wait the result 3435 if (procId != null) { 3436 result = waitProcedureResult(procId, deadlineTs); 3437 } 3438 // if we don't have a proc result, try the compatibility wait 3439 if (!procResultFound) { 3440 result = waitOperationResult(deadlineTs); 3441 } 3442 result = postOperationResult(result, deadlineTs); 3443 done = true; 3444 } catch (IOException e) { 3445 result = postOperationFailure(e, deadlineTs); 3446 done = true; 3447 } 3448 } catch (IOException e) { 3449 exception = new ExecutionException(e); 3450 done = true; 3451 } 3452 } 3453 if (exception != null) { 3454 throw exception; 3455 } 3456 return result; 3457 } 3458 3459 @Override 3460 public boolean isDone() { 3461 return done; 3462 } 3463 3464 protected HBaseAdmin getAdmin() { 3465 return admin; 3466 } 3467 3468 private V waitProcedureResult(long procId, long deadlineTs) 3469 throws IOException, TimeoutException, InterruptedException { 3470 GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder() 3471 .setProcId(procId) 3472 .build(); 3473 3474 int tries = 0; 3475 IOException serviceEx = null; 3476 while (EnvironmentEdgeManager.currentTime() < deadlineTs) { 3477 GetProcedureResultResponse response = null; 3478 try { 3479 // Try to fetch the result 3480 response = getProcedureResult(request); 3481 } catch (IOException e) { 3482 serviceEx = unwrapException(e); 3483 3484 // the master may be down 3485 LOG.warn("failed to get the procedure result procId=" + procId, serviceEx); 3486 3487 // Not much to do, if we have a DoNotRetryIOException 3488 if (serviceEx instanceof DoNotRetryIOException) { 3489 // TODO: looks like there is no way to unwrap this exception and get the proper 3490 // UnsupportedOperationException aside from looking at the message. 3491 // anyway, if we fail here we just failover to the compatibility side 3492 // and that is always a valid solution. 3493 LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx); 3494 procResultFound = false; 3495 return null; 3496 } 3497 } 3498 3499 // If the procedure is no longer running, we should have a result 3500 if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) { 3501 procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND; 3502 return convertResult(response); 3503 } 3504 3505 try { 3506 Thread.sleep(getAdmin().getPauseTime(tries++)); 3507 } catch (InterruptedException e) { 3508 throw new InterruptedException( 3509 "Interrupted while waiting for the result of proc " + procId); 3510 } 3511 } 3512 if (serviceEx != null) { 3513 throw serviceEx; 3514 } else { 3515 throw new TimeoutException("The procedure " + procId + " is still running"); 3516 } 3517 } 3518 3519 private static IOException unwrapException(IOException e) { 3520 if (e instanceof RemoteException) { 3521 return ((RemoteException)e).unwrapRemoteException(); 3522 } 3523 return e; 3524 } 3525 3526 protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request) 3527 throws IOException { 3528 return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>( 3529 admin.getConnection(), admin.getRpcControllerFactory()) { 3530 @Override 3531 protected GetProcedureResultResponse rpcCall() throws Exception { 3532 return master.getProcedureResult(getRpcController(), request); 3533 } 3534 }); 3535 } 3536 3537 /** 3538 * Convert the procedure result response to a specified type. 3539 * @param response the procedure result object to parse 3540 * @return the result data of the procedure. 3541 */ 3542 protected V convertResult(final GetProcedureResultResponse response) throws IOException { 3543 if (response.hasException()) { 3544 throw ForeignExceptionUtil.toIOException(response.getException()); 3545 } 3546 return null; 3547 } 3548 3549 /** 3550 * Fallback implementation in case the procedure is not supported by the server. 3551 * It should try to wait until the operation is completed. 3552 * @param deadlineTs the timestamp after which this method should throw a TimeoutException 3553 * @return the result data of the operation 3554 */ 3555 protected V waitOperationResult(final long deadlineTs) 3556 throws IOException, TimeoutException { 3557 return null; 3558 } 3559 3560 /** 3561 * Called after the operation is completed and the result fetched. this allows to perform extra 3562 * steps after the procedure is completed. it allows to apply transformations to the result that 3563 * will be returned by get(). 3564 * @param result the result of the procedure 3565 * @param deadlineTs the timestamp after which this method should throw a TimeoutException 3566 * @return the result of the procedure, which may be the same as the passed one 3567 */ 3568 protected V postOperationResult(final V result, final long deadlineTs) 3569 throws IOException, TimeoutException { 3570 return result; 3571 } 3572 3573 /** 3574 * Called after the operation is terminated with a failure. 3575 * this allows to perform extra steps after the procedure is terminated. 3576 * it allows to apply transformations to the result that will be returned by get(). 3577 * The default implementation will rethrow the exception 3578 * @param exception the exception got from fetching the result 3579 * @param deadlineTs the timestamp after which this method should throw a TimeoutException 3580 * @return the result of the procedure, which may be the same as the passed one 3581 */ 3582 protected V postOperationFailure(final IOException exception, final long deadlineTs) 3583 throws IOException, TimeoutException { 3584 throw exception; 3585 } 3586 3587 protected interface WaitForStateCallable { 3588 boolean checkState(int tries) throws IOException; 3589 void throwInterruptedException() throws InterruptedIOException; 3590 void throwTimeoutException(long elapsed) throws TimeoutException; 3591 } 3592 3593 protected void waitForState(final long deadlineTs, final WaitForStateCallable callable) 3594 throws IOException, TimeoutException { 3595 int tries = 0; 3596 IOException serverEx = null; 3597 long startTime = EnvironmentEdgeManager.currentTime(); 3598 while (EnvironmentEdgeManager.currentTime() < deadlineTs) { 3599 serverEx = null; 3600 try { 3601 if (callable.checkState(tries)) { 3602 return; 3603 } 3604 } catch (IOException e) { 3605 serverEx = e; 3606 } 3607 try { 3608 Thread.sleep(getAdmin().getPauseTime(tries++)); 3609 } catch (InterruptedException e) { 3610 callable.throwInterruptedException(); 3611 } 3612 } 3613 if (serverEx != null) { 3614 throw unwrapException(serverEx); 3615 } else { 3616 callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime); 3617 } 3618 } 3619 } 3620 3621 @InterfaceAudience.Private 3622 @InterfaceStability.Evolving 3623 protected static abstract class TableFuture<V> extends ProcedureFuture<V> { 3624 private final TableName tableName; 3625 3626 public TableFuture(final HBaseAdmin admin, final TableName tableName, final Long procId) { 3627 super(admin, procId); 3628 this.tableName = tableName; 3629 } 3630 3631 @Override 3632 public String toString() { 3633 return getDescription(); 3634 } 3635 3636 /** 3637 * @return the table name 3638 */ 3639 protected TableName getTableName() { 3640 return tableName; 3641 } 3642 3643 /** 3644 * @return the table descriptor 3645 */ 3646 protected TableDescriptor getTableDescriptor() throws IOException { 3647 return getAdmin().getDescriptor(getTableName()); 3648 } 3649 3650 /** 3651 * @return the operation type like CREATE, DELETE, DISABLE etc. 3652 */ 3653 public abstract String getOperationType(); 3654 3655 /** 3656 * @return a description of the operation 3657 */ 3658 protected String getDescription() { 3659 return "Operation: " + getOperationType() + ", " + "Table Name: " + 3660 tableName.getNameWithNamespaceInclAsString() + ", procId: " + procId; 3661 } 3662 3663 protected abstract class TableWaitForStateCallable implements WaitForStateCallable { 3664 @Override 3665 public void throwInterruptedException() throws InterruptedIOException { 3666 throw new InterruptedIOException("Interrupted while waiting for " + getDescription()); 3667 } 3668 3669 @Override 3670 public void throwTimeoutException(long elapsedTime) throws TimeoutException { 3671 throw new TimeoutException( 3672 getDescription() + " has not completed after " + elapsedTime + "ms"); 3673 } 3674 } 3675 3676 @Override 3677 protected V postOperationResult(final V result, final long deadlineTs) 3678 throws IOException, TimeoutException { 3679 LOG.info(getDescription() + " completed"); 3680 return super.postOperationResult(result, deadlineTs); 3681 } 3682 3683 @Override 3684 protected V postOperationFailure(final IOException exception, final long deadlineTs) 3685 throws IOException, TimeoutException { 3686 LOG.info(getDescription() + " failed with " + exception.getMessage()); 3687 return super.postOperationFailure(exception, deadlineTs); 3688 } 3689 3690 protected void waitForTableEnabled(final long deadlineTs) 3691 throws IOException, TimeoutException { 3692 waitForState(deadlineTs, new TableWaitForStateCallable() { 3693 @Override 3694 public boolean checkState(int tries) throws IOException { 3695 try { 3696 if (getAdmin().isTableAvailable(tableName)) { 3697 return true; 3698 } 3699 } catch (TableNotFoundException tnfe) { 3700 LOG.debug("Table " + tableName.getNameWithNamespaceInclAsString() 3701 + " was not enabled, sleeping. tries=" + tries); 3702 } 3703 return false; 3704 } 3705 }); 3706 } 3707 3708 protected void waitForTableDisabled(final long deadlineTs) 3709 throws IOException, TimeoutException { 3710 waitForState(deadlineTs, new TableWaitForStateCallable() { 3711 @Override 3712 public boolean checkState(int tries) throws IOException { 3713 return getAdmin().isTableDisabled(tableName); 3714 } 3715 }); 3716 } 3717 3718 protected void waitTableNotFound(final long deadlineTs) 3719 throws IOException, TimeoutException { 3720 waitForState(deadlineTs, new TableWaitForStateCallable() { 3721 @Override 3722 public boolean checkState(int tries) throws IOException { 3723 return !getAdmin().tableExists(tableName); 3724 } 3725 }); 3726 } 3727 3728 protected void waitForSchemaUpdate(final long deadlineTs) 3729 throws IOException, TimeoutException { 3730 waitForState(deadlineTs, new TableWaitForStateCallable() { 3731 @Override 3732 public boolean checkState(int tries) throws IOException { 3733 return getAdmin().getAlterStatus(tableName).getFirst() == 0; 3734 } 3735 }); 3736 } 3737 3738 protected void waitForAllRegionsOnline(final long deadlineTs, final byte[][] splitKeys) 3739 throws IOException, TimeoutException { 3740 final TableDescriptor desc = getTableDescriptor(); 3741 final AtomicInteger actualRegCount = new AtomicInteger(0); 3742 final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { 3743 @Override 3744 public boolean visit(Result rowResult) throws IOException { 3745 RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); 3746 if (list == null) { 3747 LOG.warn("No serialized HRegionInfo in " + rowResult); 3748 return true; 3749 } 3750 HRegionLocation l = list.getRegionLocation(); 3751 if (l == null) { 3752 return true; 3753 } 3754 if (!l.getRegionInfo().getTable().equals(desc.getTableName())) { 3755 return false; 3756 } 3757 if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; 3758 HRegionLocation[] locations = list.getRegionLocations(); 3759 for (HRegionLocation location : locations) { 3760 if (location == null) continue; 3761 ServerName serverName = location.getServerName(); 3762 // Make sure that regions are assigned to server 3763 if (serverName != null && serverName.getHostAndPort() != null) { 3764 actualRegCount.incrementAndGet(); 3765 } 3766 } 3767 return true; 3768 } 3769 }; 3770 3771 int tries = 0; 3772 int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication(); 3773 while (EnvironmentEdgeManager.currentTime() < deadlineTs) { 3774 actualRegCount.set(0); 3775 MetaTableAccessor.scanMetaForTableRegions(getAdmin().getConnection(), visitor, 3776 desc.getTableName()); 3777 if (actualRegCount.get() == numRegs) { 3778 // all the regions are online 3779 return; 3780 } 3781 3782 try { 3783 Thread.sleep(getAdmin().getPauseTime(tries++)); 3784 } catch (InterruptedException e) { 3785 throw new InterruptedIOException("Interrupted when opening" + " regions; " 3786 + actualRegCount.get() + " of " + numRegs + " regions processed so far"); 3787 } 3788 } 3789 throw new TimeoutException("Only " + actualRegCount.get() + " of " + numRegs 3790 + " regions are online; retries exhausted."); 3791 } 3792 } 3793 3794 @InterfaceAudience.Private 3795 @InterfaceStability.Evolving 3796 protected static abstract class NamespaceFuture extends ProcedureFuture<Void> { 3797 private final String namespaceName; 3798 3799 public NamespaceFuture(final HBaseAdmin admin, final String namespaceName, final Long procId) { 3800 super(admin, procId); 3801 this.namespaceName = namespaceName; 3802 } 3803 3804 /** 3805 * @return the namespace name 3806 */ 3807 protected String getNamespaceName() { 3808 return namespaceName; 3809 } 3810 3811 /** 3812 * @return the operation type like CREATE_NAMESPACE, DELETE_NAMESPACE, etc. 3813 */ 3814 public abstract String getOperationType(); 3815 3816 @Override 3817 public String toString() { 3818 return "Operation: " + getOperationType() + ", Namespace: " + getNamespaceName(); 3819 } 3820 } 3821 3822 @InterfaceAudience.Private 3823 @InterfaceStability.Evolving 3824 private static class ReplicationFuture extends ProcedureFuture<Void> { 3825 private final String peerId; 3826 private final Supplier<String> getOperation; 3827 3828 public ReplicationFuture(HBaseAdmin admin, String peerId, Long procId, 3829 Supplier<String> getOperation) { 3830 super(admin, procId); 3831 this.peerId = peerId; 3832 this.getOperation = getOperation; 3833 } 3834 3835 @Override 3836 public String toString() { 3837 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 3838 } 3839 } 3840 3841 @Override 3842 public List<SecurityCapability> getSecurityCapabilities() throws IOException { 3843 try { 3844 return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection(), 3845 getRpcControllerFactory()) { 3846 @Override 3847 protected List<SecurityCapability> rpcCall() throws Exception { 3848 SecurityCapabilitiesRequest req = SecurityCapabilitiesRequest.newBuilder().build(); 3849 return ProtobufUtil.toSecurityCapabilityList( 3850 master.getSecurityCapabilities(getRpcController(), req).getCapabilitiesList()); 3851 } 3852 }); 3853 } catch (IOException e) { 3854 if (e instanceof RemoteException) { 3855 e = ((RemoteException)e).unwrapRemoteException(); 3856 } 3857 throw e; 3858 } 3859 } 3860 3861 @Override 3862 public boolean splitSwitch(boolean enabled, boolean synchronous) throws IOException { 3863 return splitOrMergeSwitch(enabled, synchronous, MasterSwitchType.SPLIT); 3864 } 3865 3866 @Override 3867 public boolean mergeSwitch(boolean enabled, boolean synchronous) throws IOException { 3868 return splitOrMergeSwitch(enabled, synchronous, MasterSwitchType.MERGE); 3869 } 3870 3871 private boolean splitOrMergeSwitch(boolean enabled, boolean synchronous, 3872 MasterSwitchType switchType) throws IOException { 3873 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 3874 @Override 3875 protected Boolean rpcCall() throws Exception { 3876 MasterProtos.SetSplitOrMergeEnabledResponse response = master.setSplitOrMergeEnabled( 3877 getRpcController(), 3878 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType)); 3879 return response.getPrevValueList().get(0); 3880 } 3881 }); 3882 } 3883 3884 @Override 3885 public boolean isSplitEnabled() throws IOException { 3886 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 3887 @Override 3888 protected Boolean rpcCall() throws Exception { 3889 return master.isSplitOrMergeEnabled(getRpcController(), 3890 RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled(); 3891 } 3892 }); 3893 } 3894 3895 @Override 3896 public boolean isMergeEnabled() throws IOException { 3897 return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { 3898 @Override 3899 protected Boolean rpcCall() throws Exception { 3900 return master.isSplitOrMergeEnabled(getRpcController(), 3901 RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled(); 3902 } 3903 }); 3904 } 3905 3906 private RpcControllerFactory getRpcControllerFactory() { 3907 return this.rpcControllerFactory; 3908 } 3909 3910 @Override 3911 public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 3912 throws IOException { 3913 get(addReplicationPeerAsync(peerId, peerConfig, enabled), this.syncWaitTimeout, 3914 TimeUnit.MILLISECONDS); 3915 } 3916 3917 @Override 3918 public Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig, 3919 boolean enabled) throws IOException { 3920 AddReplicationPeerResponse response = executeCallable( 3921 new MasterCallable<AddReplicationPeerResponse>(getConnection(), getRpcControllerFactory()) { 3922 @Override 3923 protected AddReplicationPeerResponse rpcCall() throws Exception { 3924 return master.addReplicationPeer(getRpcController(), 3925 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled)); 3926 } 3927 }); 3928 return new ReplicationFuture(this, peerId, response.getProcId(), () -> "ADD_REPLICATION_PEER"); 3929 } 3930 3931 @Override 3932 public void removeReplicationPeer(String peerId) throws IOException { 3933 get(removeReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); 3934 } 3935 3936 @Override 3937 public Future<Void> removeReplicationPeerAsync(String peerId) throws IOException { 3938 RemoveReplicationPeerResponse response = 3939 executeCallable(new MasterCallable<RemoveReplicationPeerResponse>(getConnection(), 3940 getRpcControllerFactory()) { 3941 @Override 3942 protected RemoveReplicationPeerResponse rpcCall() throws Exception { 3943 return master.removeReplicationPeer(getRpcController(), 3944 RequestConverter.buildRemoveReplicationPeerRequest(peerId)); 3945 } 3946 }); 3947 return new ReplicationFuture(this, peerId, response.getProcId(), 3948 () -> "REMOVE_REPLICATION_PEER"); 3949 } 3950 3951 @Override 3952 public void enableReplicationPeer(final String peerId) throws IOException { 3953 get(enableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); 3954 } 3955 3956 @Override 3957 public Future<Void> enableReplicationPeerAsync(final String peerId) throws IOException { 3958 EnableReplicationPeerResponse response = 3959 executeCallable(new MasterCallable<EnableReplicationPeerResponse>(getConnection(), 3960 getRpcControllerFactory()) { 3961 @Override 3962 protected EnableReplicationPeerResponse rpcCall() throws Exception { 3963 return master.enableReplicationPeer(getRpcController(), 3964 RequestConverter.buildEnableReplicationPeerRequest(peerId)); 3965 } 3966 }); 3967 return new ReplicationFuture(this, peerId, response.getProcId(), 3968 () -> "ENABLE_REPLICATION_PEER"); 3969 } 3970 3971 @Override 3972 public void disableReplicationPeer(final String peerId) throws IOException { 3973 get(disableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); 3974 } 3975 3976 @Override 3977 public Future<Void> disableReplicationPeerAsync(final String peerId) throws IOException { 3978 DisableReplicationPeerResponse response = 3979 executeCallable(new MasterCallable<DisableReplicationPeerResponse>(getConnection(), 3980 getRpcControllerFactory()) { 3981 @Override 3982 protected DisableReplicationPeerResponse rpcCall() throws Exception { 3983 return master.disableReplicationPeer(getRpcController(), 3984 RequestConverter.buildDisableReplicationPeerRequest(peerId)); 3985 } 3986 }); 3987 return new ReplicationFuture(this, peerId, response.getProcId(), 3988 () -> "DISABLE_REPLICATION_PEER"); 3989 } 3990 3991 @Override 3992 public ReplicationPeerConfig getReplicationPeerConfig(final String peerId) throws IOException { 3993 return executeCallable(new MasterCallable<ReplicationPeerConfig>(getConnection(), 3994 getRpcControllerFactory()) { 3995 @Override 3996 protected ReplicationPeerConfig rpcCall() throws Exception { 3997 GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig( 3998 getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId)); 3999 return ReplicationPeerConfigUtil.convert(response.getPeerConfig()); 4000 } 4001 }); 4002 } 4003 4004 @Override 4005 public void updateReplicationPeerConfig(final String peerId, 4006 final ReplicationPeerConfig peerConfig) throws IOException { 4007 get(updateReplicationPeerConfigAsync(peerId, peerConfig), this.syncWaitTimeout, 4008 TimeUnit.MILLISECONDS); 4009 } 4010 4011 @Override 4012 public Future<Void> updateReplicationPeerConfigAsync(final String peerId, 4013 final ReplicationPeerConfig peerConfig) throws IOException { 4014 UpdateReplicationPeerConfigResponse response = 4015 executeCallable(new MasterCallable<UpdateReplicationPeerConfigResponse>(getConnection(), 4016 getRpcControllerFactory()) { 4017 @Override 4018 protected UpdateReplicationPeerConfigResponse rpcCall() throws Exception { 4019 return master.updateReplicationPeerConfig(getRpcController(), 4020 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig)); 4021 } 4022 }); 4023 return new ReplicationFuture(this, peerId, response.getProcId(), 4024 () -> "UPDATE_REPLICATION_PEER_CONFIG"); 4025 } 4026 4027 @Override 4028 public void appendReplicationPeerTableCFs(String id, 4029 Map<TableName, List<String>> tableCfs) 4030 throws ReplicationException, IOException { 4031 if (tableCfs == null) { 4032 throw new ReplicationException("tableCfs is null"); 4033 } 4034 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); 4035 ReplicationPeerConfig newPeerConfig = 4036 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 4037 updateReplicationPeerConfig(id, newPeerConfig); 4038 } 4039 4040 @Override 4041 public void removeReplicationPeerTableCFs(String id, 4042 Map<TableName, List<String>> tableCfs) 4043 throws ReplicationException, IOException { 4044 if (tableCfs == null) { 4045 throw new ReplicationException("tableCfs is null"); 4046 } 4047 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); 4048 ReplicationPeerConfig newPeerConfig = 4049 ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 4050 updateReplicationPeerConfig(id, newPeerConfig); 4051 } 4052 4053 @Override 4054 public List<ReplicationPeerDescription> listReplicationPeers() throws IOException { 4055 return listReplicationPeers((Pattern)null); 4056 } 4057 4058 @Override 4059 public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) 4060 throws IOException { 4061 return executeCallable(new MasterCallable<List<ReplicationPeerDescription>>(getConnection(), 4062 getRpcControllerFactory()) { 4063 @Override 4064 protected List<ReplicationPeerDescription> rpcCall() throws Exception { 4065 List<ReplicationProtos.ReplicationPeerDescription> peersList = master.listReplicationPeers( 4066 getRpcController(), RequestConverter.buildListReplicationPeersRequest(pattern)) 4067 .getPeerDescList(); 4068 List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size()); 4069 for (ReplicationProtos.ReplicationPeerDescription peer : peersList) { 4070 result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer)); 4071 } 4072 return result; 4073 } 4074 }); 4075 } 4076 4077 @Override 4078 public void decommissionRegionServers(List<ServerName> servers, boolean offload) 4079 throws IOException { 4080 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 4081 @Override 4082 public Void rpcCall() throws ServiceException { 4083 master.decommissionRegionServers(getRpcController(), 4084 RequestConverter.buildDecommissionRegionServersRequest(servers, offload)); 4085 return null; 4086 } 4087 }); 4088 } 4089 4090 @Override 4091 public List<ServerName> listDecommissionedRegionServers() throws IOException { 4092 return executeCallable(new MasterCallable<List<ServerName>>(getConnection(), 4093 getRpcControllerFactory()) { 4094 @Override 4095 public List<ServerName> rpcCall() throws ServiceException { 4096 ListDecommissionedRegionServersRequest req = 4097 ListDecommissionedRegionServersRequest.newBuilder().build(); 4098 List<ServerName> servers = new ArrayList<>(); 4099 for (HBaseProtos.ServerName server : master 4100 .listDecommissionedRegionServers(getRpcController(), req).getServerNameList()) { 4101 servers.add(ProtobufUtil.toServerName(server)); 4102 } 4103 return servers; 4104 } 4105 }); 4106 } 4107 4108 @Override 4109 public void recommissionRegionServer(ServerName server, List<byte[]> encodedRegionNames) 4110 throws IOException { 4111 executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { 4112 @Override 4113 public Void rpcCall() throws ServiceException { 4114 master.recommissionRegionServer(getRpcController(), 4115 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames)); 4116 return null; 4117 } 4118 }); 4119 } 4120 4121 @Override 4122 public List<TableCFs> listReplicatedTableCFs() throws IOException { 4123 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 4124 List<TableDescriptor> tables = listTableDescriptors(); 4125 tables.forEach(table -> { 4126 Map<String, Integer> cfs = new HashMap<>(); 4127 Stream.of(table.getColumnFamilies()) 4128 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 4129 .forEach(column -> { 4130 cfs.put(column.getNameAsString(), column.getScope()); 4131 }); 4132 if (!cfs.isEmpty()) { 4133 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 4134 } 4135 }); 4136 return replicatedTableCFs; 4137 } 4138 4139 @Override 4140 public void enableTableReplication(final TableName tableName) throws IOException { 4141 if (tableName == null) { 4142 throw new IllegalArgumentException("Table name cannot be null"); 4143 } 4144 if (!tableExists(tableName)) { 4145 throw new TableNotFoundException("Table '" + tableName.getNameAsString() 4146 + "' does not exists."); 4147 } 4148 byte[][] splits = getTableSplits(tableName); 4149 checkAndSyncTableDescToPeers(tableName, splits); 4150 setTableRep(tableName, true); 4151 } 4152 4153 @Override 4154 public void disableTableReplication(final TableName tableName) throws IOException { 4155 if (tableName == null) { 4156 throw new IllegalArgumentException("Table name is null"); 4157 } 4158 if (!tableExists(tableName)) { 4159 throw new TableNotFoundException("Table '" + tableName.getNameAsString() 4160 + "' does not exists."); 4161 } 4162 setTableRep(tableName, false); 4163 } 4164 4165 /** 4166 * Connect to peer and check the table descriptor on peer: 4167 * <ol> 4168 * <li>Create the same table on peer when not exist.</li> 4169 * <li>Throw an exception if the table already has replication enabled on any of the column 4170 * families.</li> 4171 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 4172 * </ol> 4173 * @param tableName name of the table to sync to the peer 4174 * @param splits table split keys 4175 * @throws IOException 4176 */ 4177 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) 4178 throws IOException { 4179 List<ReplicationPeerDescription> peers = listReplicationPeers(); 4180 if (peers == null || peers.size() <= 0) { 4181 throw new IllegalArgumentException("Found no peer cluster for replication."); 4182 } 4183 4184 for (ReplicationPeerDescription peerDesc : peers) { 4185 if (peerDesc.getPeerConfig().needToReplicate(tableName)) { 4186 Configuration peerConf = 4187 ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, peerDesc); 4188 try (Connection conn = ConnectionFactory.createConnection(peerConf); 4189 Admin repHBaseAdmin = conn.getAdmin()) { 4190 TableDescriptor tableDesc = getDescriptor(tableName); 4191 TableDescriptor peerTableDesc = null; 4192 if (!repHBaseAdmin.tableExists(tableName)) { 4193 repHBaseAdmin.createTable(tableDesc, splits); 4194 } else { 4195 peerTableDesc = repHBaseAdmin.getDescriptor(tableName); 4196 if (peerTableDesc == null) { 4197 throw new IllegalArgumentException("Failed to get table descriptor for table " 4198 + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId()); 4199 } 4200 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, 4201 tableDesc) != 0) { 4202 throw new IllegalArgumentException("Table " + tableName.getNameAsString() 4203 + " exists in peer cluster " + peerDesc.getPeerId() 4204 + ", but the table descriptors are not same when compared with source cluster." 4205 + " Thus can not enable the table's replication switch."); 4206 } 4207 } 4208 } 4209 } 4210 } 4211 } 4212 4213 /** 4214 * Set the table's replication switch if the table's replication switch is already not set. 4215 * @param tableName name of the table 4216 * @param enableRep is replication switch enable or disable 4217 * @throws IOException if a remote or network exception occurs 4218 */ 4219 private void setTableRep(final TableName tableName, boolean enableRep) throws IOException { 4220 TableDescriptor tableDesc = getDescriptor(tableName); 4221 if (!tableDesc.matchReplicationScope(enableRep)) { 4222 int scope = 4223 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 4224 modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build()); 4225 } 4226 } 4227 4228 @Override 4229 public void clearCompactionQueues(final ServerName sn, final Set<String> queues) 4230 throws IOException, InterruptedException { 4231 if (queues == null || queues.size() == 0) { 4232 throw new IllegalArgumentException("queues cannot be null or empty"); 4233 } 4234 final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); 4235 Callable<Void> callable = new Callable<Void>() { 4236 @Override 4237 public Void call() throws Exception { 4238 // TODO: There is no timeout on this controller. Set one! 4239 HBaseRpcController controller = rpcControllerFactory.newController(); 4240 ClearCompactionQueuesRequest request = 4241 RequestConverter.buildClearCompactionQueuesRequest(queues); 4242 admin.clearCompactionQueues(controller, request); 4243 return null; 4244 } 4245 }; 4246 ProtobufUtil.call(callable); 4247 } 4248 4249 @Override 4250 public List<ServerName> clearDeadServers(List<ServerName> servers) throws IOException { 4251 return executeCallable(new MasterCallable<List<ServerName>>(getConnection(), 4252 getRpcControllerFactory()) { 4253 @Override 4254 protected List<ServerName> rpcCall() throws Exception { 4255 ClearDeadServersRequest req = RequestConverter. 4256 buildClearDeadServersRequest(servers == null? Collections.EMPTY_LIST: servers); 4257 return ProtobufUtil.toServerNameList( 4258 master.clearDeadServers(getRpcController(), req).getServerNameList()); 4259 } 4260 }); 4261 } 4262 4263 @Override 4264 public void cloneTableSchema(final TableName tableName, final TableName newTableName, 4265 final boolean preserveSplits) throws IOException { 4266 checkTableExists(tableName); 4267 if (tableExists(newTableName)) { 4268 throw new TableExistsException(newTableName); 4269 } 4270 TableDescriptor htd = TableDescriptorBuilder.copy(newTableName, getTableDescriptor(tableName)); 4271 if (preserveSplits) { 4272 createTable(htd, getTableSplits(tableName)); 4273 } else { 4274 createTable(htd); 4275 } 4276 } 4277}