View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client.coprocessor;
20  
21  import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
22  import static org.apache.hadoop.hbase.HConstants.LAST_ROW;
23  
24  import com.google.protobuf.HBaseZeroCopyByteString;
25  
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.hbase.client.HTable;
30  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
31  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
32  import org.apache.hadoop.hbase.ipc.ServerRpcController;
33  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
34  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
35  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
36  import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
37  import org.apache.hadoop.hbase.util.Pair;
38  import org.apache.hadoop.security.token.Token;
39  
40  import java.io.IOException;
41  import java.util.ArrayList;
42  import java.util.List;
43  
44  /**
45   * Client proxy for SecureBulkLoadProtocol
46   * used in conjunction with SecureBulkLoadEndpoint
47   */
48  @InterfaceAudience.Private
49  public class SecureBulkLoadClient {
50    private HTable table;
51  
52    public SecureBulkLoadClient(HTable table) {
53      this.table = table;
54    }
55  
56    public String prepareBulkLoad(final TableName tableName) throws IOException {
57      try {
58        return
59          table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
60            EMPTY_START_ROW,
61            LAST_ROW,
62            new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService,String>() {
63              @Override
64              public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
65                ServerRpcController controller = new ServerRpcController();
66  
67                BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
68                    new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
69  
70                SecureBulkLoadProtos.PrepareBulkLoadRequest request =
71                    SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
72                    .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
73  
74                instance.prepareBulkLoad(controller,
75                    request,
76                    rpcCallback);
77  
78                SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
79                if (controller.failedOnException()) {
80                  throw controller.getFailedOn();
81                }
82                return response.getBulkToken();
83              }
84            }).entrySet().iterator().next().getValue();
85      } catch (Throwable throwable) {
86        throw new IOException(throwable);
87      }
88    }
89  
90    public void cleanupBulkLoad(final String bulkToken) throws IOException {
91      try {
92          table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
93              EMPTY_START_ROW,
94              LAST_ROW,
95              new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService, String>() {
96  
97                @Override
98                public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
99                  ServerRpcController controller = new ServerRpcController();
100 
101                 BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
102                     new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
103 
104                 SecureBulkLoadProtos.CleanupBulkLoadRequest request =
105                     SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
106                         .setBulkToken(bulkToken).build();
107 
108                 instance.cleanupBulkLoad(controller,
109                     request,
110                     rpcCallback);
111 
112                 if (controller.failedOnException()) {
113                   throw controller.getFailedOn();
114                 }
115                 return null;
116               }
117             });
118     } catch (Throwable throwable) {
119       throw new IOException(throwable);
120     }
121   }
122 
123   public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
124                          final Token<?> userToken,
125                          final String bulkToken,
126                          final byte[] startRow) throws IOException {
127     // we never want to send a batch of HFiles to all regions, thus cannot call
128     // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
129     try {
130       CoprocessorRpcChannel channel = table.coprocessorService(startRow);
131       SecureBulkLoadProtos.SecureBulkLoadService instance =
132           ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
133 
134       SecureBulkLoadProtos.DelegationToken protoDT =
135           SecureBulkLoadProtos.DelegationToken.newBuilder().build();
136       if(userToken != null) {
137         protoDT =
138             SecureBulkLoadProtos.DelegationToken.newBuilder()
139               .setIdentifier(HBaseZeroCopyByteString.wrap(userToken.getIdentifier()))
140               .setPassword(HBaseZeroCopyByteString.wrap(userToken.getPassword()))
141               .setKind(userToken.getKind().toString())
142               .setService(userToken.getService().toString()).build();
143       }
144 
145       List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
146           new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
147       for(Pair<byte[], String> el: familyPaths) {
148         protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
149           .setFamily(HBaseZeroCopyByteString.wrap(el.getFirst()))
150           .setPath(el.getSecond()).build());
151       }
152 
153       SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
154           SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
155             .setFsToken(protoDT)
156             .addAllFamilyPath(protoFamilyPaths)
157             .setBulkToken(bulkToken).build();
158 
159       ServerRpcController controller = new ServerRpcController();
160       BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
161           new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
162       instance.secureBulkLoadHFiles(controller,
163         request,
164         rpcCallback);
165 
166       SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
167       if (controller.failedOnException()) {
168         throw controller.getFailedOn();
169       }
170       return response.getLoaded();
171     } catch (Throwable throwable) {
172       throw new IOException(throwable);
173     }
174   }
175 
176   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
177     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
178   }
179 }