001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift;
020
021import java.io.UnsupportedEncodingException;
022import java.nio.ByteBuffer;
023import java.nio.charset.CharacterCodingException;
024import java.nio.charset.Charset;
025import java.nio.charset.CharsetDecoder;
026import java.security.PrivilegedExceptionAction;
027import java.text.NumberFormat;
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.SortedMap;
033import java.util.TreeMap;
034import javax.security.auth.Subject;
035import javax.security.auth.login.AppConfigurationEntry;
036import javax.security.auth.login.Configuration;
037import javax.security.auth.login.LoginContext;
038import javax.security.sasl.Sasl;
039import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
040import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
041import org.apache.hadoop.hbase.thrift.generated.Hbase;
042import org.apache.hadoop.hbase.thrift.generated.Mutation;
043import org.apache.hadoop.hbase.thrift.generated.TCell;
044import org.apache.hadoop.hbase.thrift.generated.TRowResult;
045import org.apache.thrift.protocol.TBinaryProtocol;
046import org.apache.thrift.protocol.TProtocol;
047import org.apache.thrift.transport.TSaslClientTransport;
048import org.apache.thrift.transport.TSocket;
049import org.apache.thrift.transport.TTransport;
050import org.apache.yetus.audience.InterfaceAudience;
051
052/**
053 * See the instructions under hbase-examples/README.txt
054 */
055@InterfaceAudience.Private
056public class DemoClient {
057
058  static protected int port;
059  static protected String host;
060  CharsetDecoder decoder = null;
061
062  private static boolean secure = false;
063  private static String serverPrincipal = "hbase";
064
065  public static void main(String[] args) throws Exception {
066    if (args.length < 2 || args.length > 4 || (args.length > 2 && !isBoolean(args[2]))) {
067      System.out.println("Invalid arguments!");
068      System.out.println("Usage: DemoClient host port [secure=false [server-principal=hbase] ]");
069
070      System.exit(-1);
071    }
072
073    port = Integer.parseInt(args[1]);
074    host = args[0];
075
076    if (args.length > 2) {
077      secure = Boolean.parseBoolean(args[2]);
078    }
079
080    if (args.length == 4) {
081      serverPrincipal = args[3];
082    }
083
084    final DemoClient client = new DemoClient();
085    Subject.doAs(getSubject(),
086      new PrivilegedExceptionAction<Void>() {
087        @Override
088        public Void run() throws Exception {
089          client.run();
090          return null;
091        }
092      });
093  }
094
095  private static boolean isBoolean(String s){
096    return Boolean.TRUE.toString().equalsIgnoreCase(s) ||
097            Boolean.FALSE.toString().equalsIgnoreCase(s);
098  }
099
100  DemoClient() {
101    decoder = Charset.forName("UTF-8").newDecoder();
102  }
103
104  // Helper to translate byte[]'s to UTF8 strings
105  private String utf8(byte[] buf) {
106    try {
107      return decoder.decode(ByteBuffer.wrap(buf)).toString();
108    } catch (CharacterCodingException e) {
109      return "[INVALID UTF-8]";
110    }
111  }
112
113    // Helper to translate strings to UTF8 bytes
114    private byte[] bytes(String s) {
115        try {
116            return s.getBytes("UTF-8");
117        } catch (UnsupportedEncodingException e) {
118            e.printStackTrace();
119            return null;
120        }
121    }
122
123  private void run() throws Exception {
124    TTransport transport = new TSocket(host, port);
125    if (secure) {
126      Map<String, String> saslProperties = new HashMap<>();
127      saslProperties.put(Sasl.QOP, "auth-conf,auth-int,auth");
128      /*
129       * The Thrift server the DemoClient is trying to connect to
130       * must have a matching principal, and support authentication.
131       *
132       * The HBase cluster must be secure, allow proxy user.
133       */
134      transport = new TSaslClientTransport("GSSAPI", null,
135              serverPrincipal, // Thrift server user name, should be an authorized proxy user.
136              host, // Thrift server domain
137              saslProperties, null, transport);
138    }
139
140    transport.open();
141
142    TProtocol protocol = new TBinaryProtocol(transport, true, true);
143    Hbase.Client client = new Hbase.Client(protocol);
144
145    byte[] t = bytes("demo_table");
146
147    // Scan all tables, look for the demo table and delete it.
148    System.out.println("scanning tables...");
149
150    for (ByteBuffer name : client.getTableNames()) {
151      System.out.println("  found: " + utf8(name.array()));
152
153      if (utf8(name.array()).equals(utf8(t))) {
154        if (client.isTableEnabled(name)) {
155          System.out.println("    disabling table: " + utf8(name.array()));
156          client.disableTable(name);
157        }
158
159        System.out.println("    deleting table: " + utf8(name.array()));
160        client.deleteTable(name);
161      }
162    }
163
164    // Create the demo table with two column families, entry: and unused:
165    ArrayList<ColumnDescriptor> columns = new ArrayList<>(2);
166    ColumnDescriptor col;
167    col = new ColumnDescriptor();
168    col.name = ByteBuffer.wrap(bytes("entry:"));
169    col.timeToLive = Integer.MAX_VALUE;
170    col.maxVersions = 10;
171    columns.add(col);
172    col = new ColumnDescriptor();
173    col.name = ByteBuffer.wrap(bytes("unused:"));
174    col.timeToLive = Integer.MAX_VALUE;
175    columns.add(col);
176
177    System.out.println("creating table: " + utf8(t));
178
179    try {
180      client.createTable(ByteBuffer.wrap(t), columns);
181    } catch (AlreadyExists ae) {
182      System.out.println("WARN: " + ae.message);
183    }
184
185    System.out.println("column families in " + utf8(t) + ": ");
186    Map<ByteBuffer, ColumnDescriptor> columnMap = client.getColumnDescriptors(ByteBuffer.wrap(t));
187
188    for (ColumnDescriptor col2 : columnMap.values()) {
189      System.out.println("  column: " + utf8(col2.name.array()) + ", maxVer: " + col2.maxVersions);
190    }
191
192    Map<ByteBuffer, ByteBuffer> dummyAttributes = null;
193    boolean writeToWal = false;
194
195    // Test UTF-8 handling
196    byte[] invalid = {(byte) 'f', (byte) 'o', (byte) 'o', (byte) '-',
197            (byte) 0xfc, (byte) 0xa1, (byte) 0xa1, (byte) 0xa1, (byte) 0xa1};
198    byte[] valid = {(byte) 'f', (byte) 'o', (byte) 'o', (byte) '-',
199            (byte) 0xE7, (byte) 0x94, (byte) 0x9F, (byte) 0xE3, (byte) 0x83,
200            (byte) 0x93, (byte) 0xE3, (byte) 0x83, (byte) 0xBC, (byte) 0xE3,
201            (byte) 0x83, (byte) 0xAB};
202
203    ArrayList<Mutation> mutations;
204    // non-utf8 is fine for data
205    mutations = new ArrayList<>(1);
206    mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:foo")),
207            ByteBuffer.wrap(invalid), writeToWal));
208    client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(bytes("foo")),
209            mutations, dummyAttributes);
210
211    // this row name is valid utf8
212    mutations = new ArrayList<>(1);
213    mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:foo")),
214            ByteBuffer.wrap(valid), writeToWal));
215    client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(valid), mutations, dummyAttributes);
216
217    // non-utf8 is now allowed in row names because HBase stores values as binary
218    mutations = new ArrayList<>(1);
219    mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:foo")),
220            ByteBuffer.wrap(invalid), writeToWal));
221    client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(invalid), mutations, dummyAttributes);
222
223    // Run a scanner on the rows we just created
224    ArrayList<ByteBuffer> columnNames = new ArrayList<>();
225    columnNames.add(ByteBuffer.wrap(bytes("entry:")));
226
227    System.out.println("Starting scanner...");
228    int scanner = client.scannerOpen(ByteBuffer.wrap(t), ByteBuffer.wrap(bytes("")), columnNames,
229            dummyAttributes);
230
231    while (true) {
232      List<TRowResult> entry = client.scannerGet(scanner);
233
234      if (entry.isEmpty()) {
235        break;
236      }
237
238      printRow(entry);
239    }
240
241    // Run some operations on a bunch of rows
242    for (int i = 100; i >= 0; --i) {
243      // format row keys as "00000" to "00100"
244      NumberFormat nf = NumberFormat.getInstance();
245      nf.setMinimumIntegerDigits(5);
246      nf.setGroupingUsed(false);
247      byte[] row = bytes(nf.format(i));
248
249      mutations = new ArrayList<>(1);
250      mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("unused:")),
251              ByteBuffer.wrap(bytes("DELETE_ME")), writeToWal));
252      client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, dummyAttributes);
253      printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes));
254      client.deleteAllRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes);
255
256      // sleep to force later timestamp
257      try {
258        Thread.sleep(50);
259      } catch (InterruptedException e) {
260        // no-op
261      }
262
263      mutations = new ArrayList<>(2);
264      mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:num")),
265              ByteBuffer.wrap(bytes("0")), writeToWal));
266      mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:foo")),
267              ByteBuffer.wrap(bytes("FOO")), writeToWal));
268      client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, dummyAttributes);
269      printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes));
270
271      Mutation m;
272      mutations = new ArrayList<>(2);
273      m = new Mutation();
274      m.column = ByteBuffer.wrap(bytes("entry:foo"));
275      m.isDelete = true;
276      mutations.add(m);
277      m = new Mutation();
278      m.column = ByteBuffer.wrap(bytes("entry:num"));
279      m.value = ByteBuffer.wrap(bytes("-1"));
280      mutations.add(m);
281      client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, dummyAttributes);
282      printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes));
283
284      mutations = new ArrayList<>();
285      mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:num")),
286              ByteBuffer.wrap(bytes(Integer.toString(i))), writeToWal));
287      mutations.add(new Mutation(false, ByteBuffer.wrap(bytes("entry:sqr")),
288              ByteBuffer.wrap(bytes(Integer.toString(i * i))), writeToWal));
289      client.mutateRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, dummyAttributes);
290      printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes));
291
292      // sleep to force later timestamp
293      try {
294        Thread.sleep(50);
295      } catch (InterruptedException e) {
296        // no-op
297      }
298
299      mutations.clear();
300      m = new Mutation();
301      m.column = ByteBuffer.wrap(bytes("entry:num"));
302      m.value= ByteBuffer.wrap(bytes("-999"));
303      mutations.add(m);
304      m = new Mutation();
305      m.column = ByteBuffer.wrap(bytes("entry:sqr"));
306      m.isDelete = true;
307      client.mutateRowTs(ByteBuffer.wrap(t), ByteBuffer.wrap(row), mutations, 1,
308              dummyAttributes); // shouldn't override latest
309      printRow(client.getRow(ByteBuffer.wrap(t), ByteBuffer.wrap(row), dummyAttributes));
310
311      List<TCell> versions = client.getVer(ByteBuffer.wrap(t), ByteBuffer.wrap(row),
312              ByteBuffer.wrap(bytes("entry:num")), 10, dummyAttributes);
313      printVersions(ByteBuffer.wrap(row), versions);
314
315      if (versions.isEmpty()) {
316        System.out.println("FATAL: wrong # of versions");
317        System.exit(-1);
318      }
319
320      List<TCell> result = client.get(ByteBuffer.wrap(t), ByteBuffer.wrap(row),
321              ByteBuffer.wrap(bytes("entry:foo")), dummyAttributes);
322
323      if (!result.isEmpty()) {
324        System.out.println("FATAL: shouldn't get here");
325        System.exit(-1);
326      }
327
328      System.out.println("");
329    }
330
331    // scan all rows/columnNames
332    columnNames.clear();
333
334    for (ColumnDescriptor col2 : client.getColumnDescriptors(ByteBuffer.wrap(t)).values()) {
335      System.out.println("column with name: " + new String(col2.name.array()));
336      System.out.println(col2.toString());
337
338      columnNames.add(col2.name);
339    }
340
341    System.out.println("Starting scanner...");
342    scanner = client.scannerOpenWithStop(ByteBuffer.wrap(t), ByteBuffer.wrap(bytes("00020")),
343            ByteBuffer.wrap(bytes("00040")), columnNames, dummyAttributes);
344
345    while (true) {
346      List<TRowResult> entry = client.scannerGet(scanner);
347
348      if (entry.isEmpty()) {
349        System.out.println("Scanner finished");
350        break;
351      }
352
353      printRow(entry);
354    }
355
356    transport.close();
357  }
358
359  private void printVersions(ByteBuffer row, List<TCell> versions) {
360    StringBuilder rowStr = new StringBuilder();
361
362    for (TCell cell : versions) {
363      rowStr.append(utf8(cell.value.array()));
364      rowStr.append("; ");
365    }
366
367    System.out.println("row: " + utf8(row.array()) + ", values: " + rowStr);
368  }
369
370  private void printRow(TRowResult rowResult) {
371    // copy values into a TreeMap to get them in sorted order
372    TreeMap<String, TCell> sorted = new TreeMap<>();
373
374    for (Map.Entry<ByteBuffer, TCell> column : rowResult.columns.entrySet()) {
375      sorted.put(utf8(column.getKey().array()), column.getValue());
376    }
377
378    StringBuilder rowStr = new StringBuilder();
379
380    for (SortedMap.Entry<String, TCell> entry : sorted.entrySet()) {
381      rowStr.append(entry.getKey());
382      rowStr.append(" => ");
383      rowStr.append(utf8(entry.getValue().value.array()));
384      rowStr.append("; ");
385    }
386
387    System.out.println("row: " + utf8(rowResult.row.array()) + ", cols: " + rowStr);
388  }
389
390  private void printRow(List<TRowResult> rows) {
391    for (TRowResult rowResult : rows) {
392      printRow(rowResult);
393    }
394  }
395
396  static Subject getSubject() throws Exception {
397    if (!secure) {
398      return new Subject();
399    }
400
401    /*
402     * To authenticate the DemoClient, kinit should be invoked ahead.
403     * Here we try to get the Kerberos credential from the ticket cache.
404     */
405    LoginContext context = new LoginContext("", new Subject(), null,
406        new Configuration() {
407          @Override
408          public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
409            Map<String, String> options = new HashMap<>();
410            options.put("useKeyTab", "false");
411            options.put("storeKey", "false");
412            options.put("doNotPrompt", "true");
413            options.put("useTicketCache", "true");
414            options.put("renewTGT", "true");
415            options.put("refreshKrb5Config", "true");
416            options.put("isInitiator", "true");
417            String ticketCache = System.getenv("KRB5CCNAME");
418
419            if (ticketCache != null) {
420              options.put("ticketCache", ticketCache);
421            }
422
423            options.put("debug", "true");
424
425            return new AppConfigurationEntry[]{
426                new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
427                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
428                    options)};
429          }
430        });
431
432    context.login();
433    return context.getSubject();
434  }
435}