001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.net.SocketAddress; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.CompletableFuture; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.RegionLocations; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.ipc.HBaseRpcController; 031import org.apache.hadoop.hbase.ipc.RpcClient; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.hadoop.hbase.wal.WAL.Entry; 035import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 036import org.apache.hadoop.security.token.Token; 037import org.apache.yetus.audience.InterfaceAudience; 038 039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 040import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 054 055/** 056 * The implementation of AsyncClusterConnection. 057 */ 058@InterfaceAudience.Private 059class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection { 060 061 public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry, 062 String clusterId, SocketAddress localAddress, User user) { 063 super(conf, registry, clusterId, localAddress, user, Collections.emptyMap()); 064 } 065 066 @Override 067 public NonceGenerator getNonceGenerator() { 068 return super.getNonceGenerator(); 069 } 070 071 @Override 072 public RpcClient getRpcClient() { 073 return rpcClient; 074 } 075 076 @Override 077 public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { 078 return new AsyncRegionServerAdmin(serverName, this); 079 } 080 081 @Override 082 public CompletableFuture<FlushRegionResponse> flush(byte[] regionName, 083 boolean writeFlushWALMarker) { 084 RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin(); 085 return admin.flushRegionInternal(regionName, null, writeFlushWALMarker); 086 } 087 088 @Override 089 public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 090 boolean reload) { 091 return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L); 092 } 093 094 @Override 095 public CompletableFuture<String> prepareBulkLoad(TableName tableName) { 096 return callerFactory.<String> single().table(tableName).row(HConstants.EMPTY_START_ROW) 097 .action((controller, loc, stub) -> ConnectionUtils.<TableName, PrepareBulkLoadRequest, 098 PrepareBulkLoadResponse, String> call(controller, loc, stub, tableName, (rn, tn) -> { 099 RegionSpecifier region = 100 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, rn); 101 return PrepareBulkLoadRequest.newBuilder().setTableName(ProtobufUtil.toProtoTableName(tn)) 102 .setRegion(region).build(); 103 }, (s, c, req, done) -> s.prepareBulkLoad(c, req, done), (c, resp) -> resp.getBulkToken())) 104 .call(); 105 } 106 107 @Override 108 public CompletableFuture<Boolean> bulkLoad(TableName tableName, 109 List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken, 110 String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) { 111 return callerFactory.<Boolean> single().table(tableName).row(row) 112 .action((controller, loc, stub) -> ConnectionUtils.<Void, BulkLoadHFileRequest, 113 BulkLoadHFileResponse, Boolean> call(controller, loc, stub, null, 114 (rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum, 115 userToken, bulkToken, copyFiles, clusterIds, replicate), 116 (s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded())) 117 .call(); 118 } 119 120 @Override 121 public CompletableFuture<Void> cleanupBulkLoad(TableName tableName, String bulkToken) { 122 return callerFactory.<Void> single().table(tableName).row(HConstants.EMPTY_START_ROW) 123 .action((controller, loc, stub) -> ConnectionUtils.<String, CleanupBulkLoadRequest, 124 CleanupBulkLoadResponse, Void> call(controller, loc, stub, bulkToken, (rn, bt) -> { 125 RegionSpecifier region = 126 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, rn); 127 return CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bt).build(); 128 }, (s, c, req, done) -> s.cleanupBulkLoad(c, req, done), (c, resp) -> null)) 129 .call(); 130 } 131 132 @Override 133 public CompletableFuture<List<ServerName>> 134 getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count) { 135 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 136 RegionServerStatusService.Interface stub = RegionServerStatusService 137 .newStub(rpcClient.createRpcChannel(masterAddrTracker.getMasterAddress(), user, rpcTimeout)); 138 HBaseRpcController controller = rpcControllerFactory.newController(); 139 stub.getLiveRegionServers(controller, 140 GetLiveRegionServersRequest.newBuilder().setCount(count).build(), resp -> { 141 if (controller.failed()) { 142 future.completeExceptionally(controller.getFailed()); 143 } else { 144 future.complete(resp.getServerList().stream().map(ProtobufUtil::toServerName) 145 .collect(Collectors.toList())); 146 } 147 }); 148 return future; 149 } 150 151 @Override 152 public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) { 153 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 154 BootstrapNodeService.Interface stub = 155 BootstrapNodeService.newStub(rpcClient.createRpcChannel(regionServer, user, rpcTimeout)); 156 HBaseRpcController controller = rpcControllerFactory.newController(); 157 stub.getAllBootstrapNodes(controller, GetAllBootstrapNodesRequest.getDefaultInstance(), 158 resp -> { 159 if (controller.failed()) { 160 future.completeExceptionally(controller.getFailed()); 161 } else { 162 future.complete(resp.getNodeList().stream().map(ProtobufUtil::toServerName) 163 .collect(Collectors.toList())); 164 } 165 }); 166 return future; 167 } 168 169 @Override 170 public CompletableFuture<Void> replicate(RegionInfo replica, List<Entry> entries, int retries, 171 long rpcTimeoutNs, long operationTimeoutNs) { 172 return new AsyncRegionReplicationRetryingCaller(RETRY_TIMER, this, 173 ConnectionUtils.retries2Attempts(retries), rpcTimeoutNs, operationTimeoutNs, replica, entries) 174 .call(); 175 } 176}