001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.thrift; 020 021import java.io.UnsupportedEncodingException; 022import java.nio.ByteBuffer; 023import java.nio.charset.CharacterCodingException; 024import java.nio.charset.Charset; 025import java.nio.charset.CharsetDecoder; 026import java.security.PrivilegedExceptionAction; 027import java.text.NumberFormat; 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import javax.security.auth.Subject; 035import javax.security.auth.login.AppConfigurationEntry; 036import javax.security.auth.login.Configuration; 037import javax.security.auth.login.LoginContext; 038import javax.security.sasl.Sasl; 039import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; 040import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; 041import org.apache.hadoop.hbase.thrift.generated.Hbase; 042import org.apache.hadoop.hbase.thrift.generated.Mutation; 043import org.apache.hadoop.hbase.thrift.generated.TCell; 044import org.apache.hadoop.hbase.thrift.generated.TRowResult; 045import org.apache.thrift.protocol.TBinaryProtocol; 046import org.apache.thrift.protocol.TProtocol; 047import org.apache.thrift.transport.TSaslClientTransport; 048import org.apache.thrift.transport.TSocket; 049import org.apache.thrift.transport.TTransport; 050import org.apache.yetus.audience.InterfaceAudience; 051 052/** 053 * See the instructions under hbase-examples/README.txt 054 */ 055@InterfaceAudience.Private 056public class DemoClient { 057 058 static protected int port; 059 static protected String host; 060 CharsetDecoder decoder = null; 061 062 private static boolean secure = false; 063 private static String serverPrincipal = "hbase"; 064 065 public static void main(String[] args) throws Exception { 066 067 if (args.length < 2 || args.length > 4 || (args.length > 2 && !isBoolean(args[2]))) { 068 069 System.out.println("Invalid arguments!"); 070 System.out.println("Usage: DemoClient host port [secure=false [server-principal=hbase] ]"); 071 072 System.exit(-1); 073 } 074 075 port = Integer.parseInt(args[1]); 076 host = args[0]; 077 if (args.length > 2) { 078 secure = Boolean.parseBoolean(args[2]); 079 } 080 081 if (args.length == 4) { 082 serverPrincipal = args[3]; 083 } 084 085 final DemoClient client = new DemoClient(); 086 Subject.doAs(getSubject(), 087 new PrivilegedExceptionAction<Void>() { 088 @Override 089 public Void run() throws Exception { 090 client.run(); 091 return null; 092 } 093 }); 094 } 095 096 private static boolean isBoolean(String s){ 097 return Boolean.TRUE.toString().equalsIgnoreCase(s) || Boolean.FALSE.toString().equalsIgnoreCase(s); 098 } 099 100 DemoClient() { 101 decoder = Charset.forName("UTF-8").newDecoder(); 102 } 103 104 // Helper to translate byte[]'s to UTF8 strings 105 private String utf8(byte[] buf) { 106 try { 107 return decoder.decode(ByteBuffer.wrap(buf)).toString(); 108 } catch (CharacterCodingException e) { 109 return "[INVALID UTF-8]"; 110 } 111 } 112 113 // Helper to translate strings to UTF8 bytes 114 private byte[] bytes(String s) { 115 try { 116 return s.getBytes("UTF-8"); 117 } catch (UnsupportedEncodingException e) { 118 e.printStackTrace(); 119 return null; 120 } 121 } 122 123 private void run() throws Exception { 124 TTransport transport = new TSocket(host, port); 125 if (secure) { 126 Map<String, String> saslProperties = new HashMap<>(); 127 saslProperties.put(Sasl.QOP, "auth-conf,auth-int,auth"); 128 /** 129 * The Thrift server the DemoClient is trying to connect to 130 * must have a matching principal, and support authentication. 131 * 132 * The HBase cluster must be secure, allow proxy user. 133 */ 134 transport = new TSaslClientTransport("GSSAPI", null, 135 serverPrincipal, // Thrift server user name, should be an authorized proxy user. 136 host, // Thrift server domain 137 saslProperties, null, transport); 138 } 139 140 transport.open(); 141 142 TProtocol protocol = new TBinaryProtocol(transport, true, true); 143 Hbase.Client client = new Hbase.Client(protocol); 144 145 byte[] t = bytes("demo_table"); 146 147 // 148 // Scan all tables, look for the demo table and delete it. 149 // 150 System.out.println("scanning tables..."); 151 for (ByteBuffer name : client.getTableNames()) { 152 System.out.println(" found: " + utf8(name.array())); 153 if (utf8(name.array()).equals(utf8(t))) { 154 if (client.isTableEnabled(name)) { 155 System.out.println(" disabling table: " + utf8(name.array())); 156 client.disableTable(name); 157 } 158 System.out.println(" deleting table: " + utf8(name.array())); 159 client.deleteTable(name); 160 } 161 } 162 163 // 164 // Create the demo table with two column families, entry: and unused: 165 // 166 ArrayList<ColumnDescriptor> columns = new ArrayList<>(2); 167 ColumnDescriptor col; 168 col = new ColumnDescriptor(); 169 col.name = ByteBuffer.wrap(bytes("entry:")); 170 col.timeToLive = Integer.MAX_VALUE; 171 col.maxVersions = 10; 172 columns.add(col); 173 col = new ColumnDescriptor(); 174 col.name = ByteBuffer.wrap(bytes("unused:")); 175 col.timeToLive = Integer.MAX_VALUE; 176 columns.add(col); 177 178 System.out.println("creating table: " + utf8(t)); 179 try { 180 client.createTable(ByteBuffer.wrap(t), columns); 181 } catch (AlreadyExists ae) { 182 System.out.println("WARN: " + ae.message); 183 } 184 185 System.out.println("column families in " + utf8(t) + ": "); 186 Map<ByteBuffer, ColumnDescriptor> columnMap = client.getColumnDescriptors(ByteBuffer.wrap(t)); 187 for (ColumnDescriptor col2 : columnMap.values()) { 188 System.out.println(" column: " + utf8(col2.name.array()) + ", maxVer: " + Integer.toString(col2.maxVersions)); 189 } 190 191 Map<ByteBuffer, ByteBuffer> dummyAttributes = null; 192 boolean writeToWal = false; 193 194 // 195 // Test UTF-8 handling 196 // 197 byte[] invalid = {(byte) 'f', (byte) 'o', (byte) 'o', (byte) '-', 198 (byte) 0xfc, (byte) 0xa1, (byte) 0xa1, (byte) 0xa1, (byte) 0xa1}; 199 byte[] valid = {(byte) 'f', (byte) 'o', (byte) 'o', (byte) '-', 200 (byte) 0xE7, (byte) 0x94, (byte) 0x9F, (byte) 0xE3, (byte) 0x83, 201 (byte) 0x93, (byte) 0xE3, (byte) 0x83, (byte) 0xBC, (byte) 0xE3, 202 (byte) 0x83, (byte) 0xAB}; 203 204 ArrayList<Mutation> mutations; 205 // non-utf8 is fine for data 206 mutations = new ArrayList<>(1); 207 mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:foo")), 208 ByteBuffer.wrap(invalid), writeToWal)); 209 client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(bytes("foo")), 210 mutations, dummyAttributes); 211 212 213 // this row name is valid utf8 214 mutations = new ArrayList<>(1); 215 mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:foo")), ByteBuffer.wrap(valid), writeToWal)); 216 client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(valid), mutations, dummyAttributes); 217 218 // non-utf8 is now allowed in row names because HBase stores values as binary 219 220 mutations = new ArrayList<>(1); 221 mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:foo")), ByteBuffer.wrap(invalid), writeToWal)); 222 client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(invalid), mutations, dummyAttributes); 223 224 225 // Run a scanner on the rows we just created 226 ArrayList<ByteBuffer> columnNames = new ArrayList<>(); 227 columnNames.add(ByteBuffer.wrap(bytes("entry:"))); 228 229 System.out.println("Starting scanner..."); 230 int scanner = client.scannerOpen(ByteBuffer.wrap(t), ByteBuffer.wrap(bytes("")), columnNames, dummyAttributes); 231 232 while (true) { 233 List<TRowResult> entry = client.scannerGet(scanner); 234 if (entry.isEmpty()) { 235 break; 236 } 237 printRow(entry); 238 } 239 240 // 241 // Run some operations on a bunch of rows 242 // 243 for (int i = 100; i >= 0; --i) { 244 // format row keys as "00000" to "00100" 245 NumberFormat nf = NumberFormat.getInstance(); 246 nf.setMinimumIntegerDigits(5); 247 nf.setGroupingUsed(false); 248 byte[] row = bytes(nf.format(i)); 249 250 mutations = new ArrayList<>(1); 251 mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("unused:")), ByteBuffer.wrap(bytes("DELETE_ME")), writeToWal)); 252 client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, dummyAttributes); 253 printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes)); 254 client.deleteAllRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes); 255 256 // sleep to force later timestamp 257 try { 258 Thread.sleep(50); 259 } catch (InterruptedException e) { 260 // no-op 261 } 262 263 mutations = new ArrayList<>(2); 264 mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:num")), ByteBuffer.wrap(bytes("0")), writeToWal)); 265 mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:foo")), ByteBuffer.wrap(bytes("FOO")), writeToWal)); 266 client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, dummyAttributes); 267 printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes)); 268 269 Mutation m; 270 mutations = new ArrayList<>(2); 271 m = new Mutation(); 272 m.column = ByteBuffer.wrap(bytes("entry:foo")); 273 m.isDelete = true; 274 mutations.add(m); 275 m = new Mutation(); 276 m.column = ByteBuffer.wrap(bytes("entry:num")); 277 m.value = ByteBuffer.wrap(bytes("-1")); 278 mutations.add(m); 279 client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, dummyAttributes); 280 printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes)); 281 282 mutations = new ArrayList<>(); 283 mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:num")), ByteBuffer.wrap(bytes(Integer.toString(i))), writeToWal)); 284 mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:sqr")), ByteBuffer.wrap(bytes(Integer.toString(i * i))), writeToWal)); 285 client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, dummyAttributes); 286 printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes)); 287 288 // sleep to force later timestamp 289 try { 290 Thread.sleep(50); 291 } catch (InterruptedException e) { 292 // no-op 293 } 294 295 mutations.clear(); 296 m = new Mutation(); 297 m.column = ByteBuffer.wrap(bytes("entry:num")); 298 m.value= ByteBuffer.wrap(bytes("-999")); 299 mutations.add(m); 300 m = new Mutation(); 301 m.column = ByteBuffer.wrap(bytes("entry:sqr")); 302 m.isDelete = true; 303 client.mutateRowTs(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, 1, dummyAttributes); // shouldn't override latest 304 printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes)); 305 306 List<TCell> versions = client.getVer(ByteBuffer.wrap(t), ByteBuffer.wrap(row), ByteBuffer.wrap(bytes("entry:num")), 10, dummyAttributes); 307 printVersions(ByteBuffer.wrap(row), versions); 308 if (versions.isEmpty()) { 309 System.out.println("FATAL: wrong # of versions"); 310 System.exit(-1); 311 } 312 313 List<TCell> result = client.get(ByteBuffer.wrap(t), ByteBuffer.wrap(row), ByteBuffer.wrap(bytes("entry:foo")), dummyAttributes); 314 if (!result.isEmpty()) { 315 System.out.println("FATAL: shouldn't get here"); 316 System.exit(-1); 317 } 318 319 System.out.println(""); 320 } 321 322 // scan all rows/columnNames 323 324 columnNames.clear(); 325 for (ColumnDescriptor col2 : client.getColumnDescriptors(ByteBuffer.wrap(t)).values()) { 326 System.out.println("column with name: " + new String(col2.name.array())); 327 System.out.println(col2.toString()); 328 329 columnNames.add(col2.name); 330 } 331 332 System.out.println("Starting scanner..."); 333 scanner = client.scannerOpenWithStop(ByteBuffer.wrap(t), ByteBuffer.wrap(bytes("00020")), ByteBuffer.wrap(bytes("00040")), columnNames, dummyAttributes); 334 335 while (true) { 336 List<TRowResult> entry = client.scannerGet(scanner); 337 if (entry.isEmpty()) { 338 System.out.println("Scanner finished"); 339 break; 340 } 341 printRow(entry); 342 } 343 344 transport.close(); 345 } 346 347 private void printVersions(ByteBuffer row, List<TCell> versions) { 348 StringBuilder rowStr = new StringBuilder(); 349 for (TCell cell : versions) { 350 rowStr.append(utf8(cell.value.array())); 351 rowStr.append("; "); 352 } 353 System.out.println("row: " + utf8(row.array()) + ", values: " + rowStr); 354 } 355 356 private void printRow(TRowResult rowResult) { 357 // copy values into a TreeMap to get them in sorted order 358 359 TreeMap<String, TCell> sorted = new TreeMap<>(); 360 for (Map.Entry<ByteBuffer, TCell> column : rowResult.columns.entrySet()) { 361 sorted.put(utf8(column.getKey().array()), column.getValue()); 362 } 363 364 StringBuilder rowStr = new StringBuilder(); 365 for (SortedMap.Entry<String, TCell> entry : sorted.entrySet()) { 366 rowStr.append(entry.getKey()); 367 rowStr.append(" => "); 368 rowStr.append(utf8(entry.getValue().value.array())); 369 rowStr.append("; "); 370 } 371 System.out.println("row: " + utf8(rowResult.row.array()) + ", cols: " + rowStr); 372 } 373 374 private void printRow(List<TRowResult> rows) { 375 for (TRowResult rowResult : rows) { 376 printRow(rowResult); 377 } 378 } 379 380 static Subject getSubject() throws Exception { 381 if (!secure) return new Subject(); 382 383 /* 384 * To authenticate the DemoClient, kinit should be invoked ahead. 385 * Here we try to get the Kerberos credential from the ticket cache. 386 */ 387 LoginContext context = new LoginContext("", new Subject(), null, 388 new Configuration() { 389 @Override 390 public AppConfigurationEntry[] getAppConfigurationEntry(String name) { 391 Map<String, String> options = new HashMap<>(); 392 options.put("useKeyTab", "false"); 393 options.put("storeKey", "false"); 394 options.put("doNotPrompt", "true"); 395 options.put("useTicketCache", "true"); 396 options.put("renewTGT", "true"); 397 options.put("refreshKrb5Config", "true"); 398 options.put("isInitiator", "true"); 399 String ticketCache = System.getenv("KRB5CCNAME"); 400 if (ticketCache != null) { 401 options.put("ticketCache", ticketCache); 402 } 403 options.put("debug", "true"); 404 405 return new AppConfigurationEntry[]{ 406 new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", 407 AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, 408 options)}; 409 } 410 }); 411 context.login(); 412 return context.getSubject(); 413 } 414}