001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.net.SocketAddress;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.CompletableFuture;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.RegionLocations;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.ipc.HBaseRpcController;
031import org.apache.hadoop.hbase.ipc.RpcClient;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.util.Pair;
034import org.apache.hadoop.hbase.wal.WAL.Entry;
035import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
036import org.apache.hadoop.security.token.Token;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
040import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
054
055/**
056 * The implementation of AsyncClusterConnection.
057 */
058@InterfaceAudience.Private
059class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection {
060
061  public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry,
062    String clusterId, SocketAddress localAddress, User user) {
063    super(conf, registry, clusterId, localAddress, user, Collections.emptyMap());
064  }
065
066  @Override
067  public NonceGenerator getNonceGenerator() {
068    return super.getNonceGenerator();
069  }
070
071  @Override
072  public RpcClient getRpcClient() {
073    return rpcClient;
074  }
075
076  @Override
077  public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
078    return new AsyncRegionServerAdmin(serverName, this);
079  }
080
081  @Override
082  public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
083    boolean writeFlushWALMarker) {
084    RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
085    return admin.flushRegionInternal(regionName, null, writeFlushWALMarker);
086  }
087
088  @Override
089  public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
090    boolean reload) {
091    return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
092  }
093
094  @Override
095  public CompletableFuture<String> prepareBulkLoad(TableName tableName) {
096    return callerFactory.<String> single().table(tableName).row(HConstants.EMPTY_START_ROW)
097      .action((controller, loc, stub) -> ConnectionUtils.<TableName, PrepareBulkLoadRequest,
098        PrepareBulkLoadResponse, String> call(controller, loc, stub, tableName, (rn, tn) -> {
099          RegionSpecifier region =
100            RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, rn);
101          return PrepareBulkLoadRequest.newBuilder().setTableName(ProtobufUtil.toProtoTableName(tn))
102            .setRegion(region).build();
103        }, (s, c, req, done) -> s.prepareBulkLoad(c, req, done), (c, resp) -> resp.getBulkToken()))
104      .call();
105  }
106
107  @Override
108  public CompletableFuture<Boolean> bulkLoad(TableName tableName,
109    List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
110    String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
111    return callerFactory.<Boolean> single().table(tableName).row(row)
112      .action((controller, loc, stub) -> ConnectionUtils.<Void, BulkLoadHFileRequest,
113        BulkLoadHFileResponse, Boolean> call(controller, loc, stub, null,
114          (rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
115            userToken, bulkToken, copyFiles, clusterIds, replicate),
116          (s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
117      .call();
118  }
119
120  @Override
121  public CompletableFuture<Void> cleanupBulkLoad(TableName tableName, String bulkToken) {
122    return callerFactory.<Void> single().table(tableName).row(HConstants.EMPTY_START_ROW)
123      .action((controller, loc, stub) -> ConnectionUtils.<String, CleanupBulkLoadRequest,
124        CleanupBulkLoadResponse, Void> call(controller, loc, stub, bulkToken, (rn, bt) -> {
125          RegionSpecifier region =
126            RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, rn);
127          return CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bt).build();
128        }, (s, c, req, done) -> s.cleanupBulkLoad(c, req, done), (c, resp) -> null))
129      .call();
130  }
131
132  @Override
133  public CompletableFuture<List<ServerName>>
134    getLiveRegionServers(MasterAddressTracker masterAddrTracker, int count) {
135    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
136    RegionServerStatusService.Interface stub = RegionServerStatusService
137      .newStub(rpcClient.createRpcChannel(masterAddrTracker.getMasterAddress(), user, rpcTimeout));
138    HBaseRpcController controller = rpcControllerFactory.newController();
139    stub.getLiveRegionServers(controller,
140      GetLiveRegionServersRequest.newBuilder().setCount(count).build(), resp -> {
141        if (controller.failed()) {
142          future.completeExceptionally(controller.getFailed());
143        } else {
144          future.complete(resp.getServerList().stream().map(ProtobufUtil::toServerName)
145            .collect(Collectors.toList()));
146        }
147      });
148    return future;
149  }
150
151  @Override
152  public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) {
153    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
154    BootstrapNodeService.Interface stub =
155      BootstrapNodeService.newStub(rpcClient.createRpcChannel(regionServer, user, rpcTimeout));
156    HBaseRpcController controller = rpcControllerFactory.newController();
157    stub.getAllBootstrapNodes(controller, GetAllBootstrapNodesRequest.getDefaultInstance(),
158      resp -> {
159        if (controller.failed()) {
160          future.completeExceptionally(controller.getFailed());
161        } else {
162          future.complete(resp.getNodeList().stream().map(ProtobufUtil::toServerName)
163            .collect(Collectors.toList()));
164        }
165      });
166    return future;
167  }
168
169  @Override
170  public CompletableFuture<Void> replicate(RegionInfo replica, List<Entry> entries, int retries,
171    long rpcTimeoutNs, long operationTimeoutNs) {
172    return new AsyncRegionReplicationRetryingCaller(RETRY_TIMER, this,
173      ConnectionUtils.retries2Attempts(retries), rpcTimeoutNs, operationTimeoutNs, replica, entries)
174        .call();
175  }
176}