001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Table; 027import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 028import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 029import org.apache.hadoop.hbase.ipc.ServerRpcController; 030import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 031import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; 032import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 033import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 034import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken; 035import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 036import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 037import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos; 038import org.apache.hadoop.hbase.util.ByteStringer; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.security.token.Token; 041import org.apache.yetus.audience.InterfaceAudience; 042 043/** 044 * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint 045 * @deprecated Use for backward compatibility testing only. Will be removed when 046 * SecureBulkLoadEndpoint is not supported. 047 */ 048@Deprecated 049@InterfaceAudience.Private 050public class SecureBulkLoadEndpointClient { 051 private Table table; 052 053 public SecureBulkLoadEndpointClient(Table table) { 054 this.table = table; 055 } 056 057 public String prepareBulkLoad(final TableName tableName) throws IOException { 058 try { 059 CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); 060 SecureBulkLoadProtos.SecureBulkLoadService instance = 061 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); 062 063 ServerRpcController controller = new ServerRpcController(); 064 065 CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback = 066 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 067 068 PrepareBulkLoadRequest request = 069 PrepareBulkLoadRequest.newBuilder() 070 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 071 072 instance.prepareBulkLoad(controller, request, rpcCallback); 073 074 PrepareBulkLoadResponse response = rpcCallback.get(); 075 if (controller.failedOnException()) { 076 throw controller.getFailedOn(); 077 } 078 079 return response.getBulkToken(); 080 } catch (Throwable throwable) { 081 throw new IOException(throwable); 082 } 083 } 084 085 public void cleanupBulkLoad(final String bulkToken) throws IOException { 086 try { 087 CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); 088 SecureBulkLoadProtos.SecureBulkLoadService instance = 089 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); 090 091 ServerRpcController controller = new ServerRpcController(); 092 093 CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback = 094 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 095 096 CleanupBulkLoadRequest request = 097 CleanupBulkLoadRequest.newBuilder() 098 .setBulkToken(bulkToken).build(); 099 100 instance.cleanupBulkLoad(controller, 101 request, 102 rpcCallback); 103 104 if (controller.failedOnException()) { 105 throw controller.getFailedOn(); 106 } 107 } catch (Throwable throwable) { 108 throw new IOException(throwable); 109 } 110 } 111 112 public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths, 113 final Token<?> userToken, final String bulkToken, final byte[] startRow) 114 throws IOException { 115 // we never want to send a batch of HFiles to all regions, thus cannot call 116 // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 117 try { 118 CoprocessorRpcChannel channel = table.coprocessorService(startRow); 119 SecureBulkLoadProtos.SecureBulkLoadService instance = 120 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); 121 122 DelegationToken protoDT = 123 DelegationToken.newBuilder().build(); 124 if(userToken != null) { 125 protoDT = 126 DelegationToken.newBuilder() 127 .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) 128 .setPassword(ByteStringer.wrap(userToken.getPassword())) 129 .setKind(userToken.getKind().toString()) 130 .setService(userToken.getService().toString()).build(); 131 } 132 133 List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths = 134 new ArrayList<>(familyPaths.size()); 135 for(Pair<byte[], String> el: familyPaths) { 136 protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() 137 .setFamily(ByteStringer.wrap(el.getFirst())) 138 .setPath(el.getSecond()).build()); 139 } 140 141 SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = 142 SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() 143 .setFsToken(protoDT) 144 .addAllFamilyPath(protoFamilyPaths) 145 .setBulkToken(bulkToken).build(); 146 147 ServerRpcController controller = new ServerRpcController(); 148 CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> 149 rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); 150 instance.secureBulkLoadHFiles(controller, 151 request, 152 rpcCallback); 153 154 SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); 155 if (controller.failedOnException()) { 156 throw controller.getFailedOn(); 157 } 158 return response.getLoaded(); 159 } catch (Throwable throwable) { 160 throw new IOException(throwable); 161 } 162 } 163}