当前位置:   article > 正文

利用phoenix进行Hbase数据访问_当无法预知json数据字段的情况下如何使用phoenix

当无法预知json数据字段的情况下如何使用phoenix

Hadoop HBase Hlive 

一、背景

近期一个用户画像的项目,数据量庞大,用MySQL进行存取不太现实,所以采用Hbase集群的方案来实施。由于业务层使用的是PHP,所以研发同学首先想到的是PHP-Thrift来访问Hbase,编码实验了几天,效果不是太理想,尤其是编码成本较大,各种scan、filter之类的语法,不利于团队进行快速开发;当然,最崩溃的还是想利用count进行数据总量计算,是Thrift里,这个太难搞。

所以再换一个phoenix的方案,模拟SQL的形式进行Hbase数据访问;不过这东西没有PHP版本的,只有Hbasejar包支持,还有一个python版本的command line console,开发过程中用来做数据查看还是比较方便的。

二、环境部署

1、phoenix下载

2、部署jar包到Hbase集群

  1. # 下载phoenix
  2. wget http://apache.fayea.com/phoenix/phoenix-4.7.0-HBase-1.1/bin/phoenix-4.7.0-HBase-1.1-bin.tar.gz
  3. # 解压
  4. tar zxfv phoenix-4.7.0-HBase-1.1-bin.tar.gz > /dev/null
  5. # 部署jar包到hbase
  6. cp -r phoenix-4.7.0-HBase-1.1/*.jar /home/hbase/hbase-1.1.5/lib/
  7. # 重启Hbase
  8. /home/hbase/hbase-1.1.5/bin/stop-hbase.sh
  9. /home/hbase/hbase-1.1.5/bin/start-hbase.sh

3、验证phoenix安装情况

  1. cd /home/hbase/phoenix-4.7.0-HBase-1.1/bin
  2. ./sqlline.py localhost:2181

出现下图所示的样子,就算是安装成功了:

1.png

敲击 !help 命令,查看内置命令:

  1. 0: jdbc:phoenix:localhost:2181> !help
  2. !all Execute the specified SQL against all the current connections
  3. !autocommit Set autocommit mode on or off
  4. !batch Start or execute a batch of statements
  5. !brief Set verbose mode off
  6. !call Execute a callable statement
  7. !close Close the current connection to the database
  8. !closeall Close all current open connections
  9. !columns List all the columns for the specified table
  10. !commit Commit the current transaction (if autocommit is off)
  11. !connect Open a new connection to the database.
  12. !dbinfo Give metadata information about the database
  13. !describe Describe a table
  14. !dropall Drop all tables in the current database
  15. !exportedkeys List all the exported keys for the specified table
  16. !go Select the current connection
  17. !help Print a summary of command usage
  18. !history Display the command history
  19. !importedkeys List all the imported keys for the specified table
  20. !indexes List all the indexes for the specified table
  21. !isolation Set the transaction isolation for this connection
  22. !list List the current connections
  23. !manual Display the SQLLine manual
  24. !metadata Obtain metadata information
  25. !nativesql Show the native SQL for the specified statement
  26. !outputformat Set the output format for displaying results
  27. (table,vertical,csv,tsv,xmlattrs,xmlelements)
  28. !primarykeys List all the primary keys for the specified table
  29. !procedures List all the procedures
  30. !properties Connect to the database specified in the properties file(s)
  31. !quit Exits the program
  32. !reconnect Reconnect to the database
  33. !record Record all output to the specified file
  34. !rehash Fetch table and column names for command completion
  35. !rollback Roll back the current transaction (if autocommit is off)
  36. !run Run a script from the specified file
  37. !save Save the current variabes and aliases
  38. !scan Scan for installed JDBC drivers
  39. !script Start saving a script to a file
  40. !set Set a sqlline variable
  41. ......

4、查看DB中已经存在的表

0: jdbc:phoenix:localhost:2181> !table

2.png

5、查看表结构(隐藏列族名)

0: jdbc:phoenix:localhost:2181> !describe "xxx"

3.png

注意:phoenix/hbase对表名、字段名都是大小写敏感,如果直接写小写字母,不加双引号,则默认会被转换成大写字母。

6、查看表内容

0: jdbc:phoenix:localhost:2181> select * from "xxx" ;

4.png

PhoenixSQL的语法跟MySQL语法没多大区别,入门成本较低。注意,如果Hbase的表已经有了,则需要手动再在Phoenix中创建同名(注意双引号括起来的大小写)的Table。

三、开发

Phoenix提供的是Hbase的jar包支持,所以肯定是创建一个Java Web Project来提供API服务。

1、设计原则

  • 模拟Python版本Command line Console的操作,直接接受原生Phoenix-SQL作为参数进行处理
  • Phoenix DB不支持直接设置连接超时, 所以这里使用线程池的方式来控制数据库连接超时
  • SQL处理后的结果存放在一个PhoenixResultSet中,SQL本身不固定,所以结果字段也不固定;所以这里使用PhoenixResultSet.getMetaData()来获取返回的字段名
  • 上层应用一般不要求数据返回的类型,所以全部采用PhoenixResultSet.getString(index)的形式获取字符串类型字段值
  • 最终数据编译成JSON格式进行返回,借助org.json.jar包来处理

2、编码实现

1)、PhoenixClient.java
  1. package com.qudian.bi;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.ResultSetMetaData;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. import java.util.ArrayList;
  8. import java.util.concurrent.Callable;
  9. import java.util.concurrent.ExecutionException;
  10. import java.util.concurrent.ExecutorService;
  11. import java.util.concurrent.Executors;
  12. import java.util.concurrent.Future;
  13. import java.util.concurrent.TimeUnit;
  14. import java.util.concurrent.TimeoutException;
  15. import org.apache.phoenix.jdbc.PhoenixResultSet;
  16. import org.json.JSONArray;
  17. import org.json.JSONException;
  18. import org.json.JSONObject;
  19. /**
  20. * 利用Phoenix访问Hbase
  21. *
  22. * @author zhaoxianlie
  23. */
  24. public class PhoenixClient {
  25. /**
  26. * 利用静态块的方式初始化Driver,防止Tomcat加载不到(有时候比较诡异)
  27. */
  28. static {
  29. try {
  30. Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
  31. } catch (ClassNotFoundException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. /**
  36. * 获取一个Hbase-Phoenix的连接
  37. *
  38. * @param host
  39. * zookeeper的master-host
  40. * @param port
  41. * zookeeper的master-port
  42. * @return
  43. */
  44. private static Connection getConnection(String host, String port) {
  45. Connection cc = null;
  46. final String url = "jdbc:phoenix:" + host + ":" + port;
  47. if (cc == null) {
  48. try {
  49. // Phoenix DB不支持直接设置连接超时
  50. // 所以这里使用线程池的方式来控制数据库连接超时
  51. final ExecutorService exec = Executors.newFixedThreadPool(1);
  52. Callable<Connection> call = new Callable<Connection>() {
  53. public Connection call() throws Exception {
  54. return DriverManager.getConnection(url);
  55. }
  56. };
  57. Future<Connection> future = exec.submit(call);
  58. // 如果在5s钟之内,还没得到 Connection 对象,则认为连接超时,不继续阻塞,防止服务夯死
  59. cc = future.get(1000 * 5, TimeUnit.MILLISECONDS);
  60. exec.shutdownNow();
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. } catch (ExecutionException e) {
  64. e.printStackTrace();
  65. } catch (TimeoutException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. return cc;
  70. }
  71. /**
  72. * 根据host、port,以及sql查询hbase中的内容;根据phoenix支持的SQL格式,查询Hbase的数据,并返回json格式的数据
  73. *
  74. * @param host
  75. * zookeeper的master-host
  76. * @param port
  77. * zookeeper的master-port
  78. * @param phoenixSQL
  79. * sql语句
  80. * @return json-string
  81. * @return
  82. */
  83. public static String execSql(String host, String port, String phoenixSQL) {
  84. if (host == null || port == null || host.trim() == ""
  85. || port.trim() == "") {
  86. return "必须指定hbase master的IP和端口";
  87. } else if (phoenixSQL == null || phoenixSQL.trim() == "") {
  88. return "请指定合法的Phoenix SQL!";
  89. }
  90. String result = "";
  91. try {
  92. // 耗时监控:记录一个开始时间
  93. long startTime = System.currentTimeMillis();
  94. // 获取一个Phoenix DB连接
  95. Connection conn = PhoenixClient.getConnection(host, port);
  96. if (conn == null) {
  97. return "Phoenix DB连接超时!";
  98. }
  99. // 准备查询
  100. Statement stmt = conn.createStatement();
  101. PhoenixResultSet set = (PhoenixResultSet) stmt
  102. .executeQuery(phoenixSQL);
  103. // 查询出来的列是不固定的,所以这里通过遍历的方式获取列名
  104. ResultSetMetaData meta = set.getMetaData();
  105. ArrayList<String> cols = new ArrayList<String>();
  106. // 把最终数据都转成JSON返回
  107. JSONArray jsonArr = new JSONArray();
  108. while (set.next()) {
  109. if (cols.size() == 0) {
  110. for (int i = 1, count = meta.getColumnCount(); i <= count; i++) {
  111. cols.add(meta.getColumnName(i));
  112. }
  113. }
  114. JSONObject json = new JSONObject();
  115. for (int i = 0, len = cols.size(); i < len; i++) {
  116. json.put(cols.get(i), set.getString(cols.get(i)));
  117. }
  118. jsonArr.put(json);
  119. }
  120. // 耗时监控:记录一个结束时间
  121. long endTime = System.currentTimeMillis();
  122. // 结果封装
  123. JSONObject data = new JSONObject();
  124. data.put("data", jsonArr);
  125. data.put("cost", (endTime - startTime) + " ms");
  126. result = data.toString();
  127. } catch (SQLException e) {
  128. e.printStackTrace();
  129. return "SQL执行出错:" + e.getMessage();
  130. } catch (JSONException e) {
  131. e.printStackTrace();
  132. return "JSON转换出错:" + e.getMessage();
  133. }
  134. return result;
  135. }
  136. /**
  137. * Just for phoenix test!
  138. * @param args
  139. */
  140. public static void main(String[] args) {
  141. String pheonixSQL = "select count(1) from \"t\"";
  142. String host = "localhost";
  143. if(args.length >= 1) {
  144. host = args[0];
  145. }
  146. String result = PhoenixClient.execSql(host, "2181", pheonixSQL);
  147. System.out.println(result);
  148. }
  149. }
2)、Servlet
  1. public void doGet(HttpServletRequest request, HttpServletResponse response)
  2. throws ServletException, IOException {
  3. response.setContentType("application/json;charset=utf-8");
  4. PrintWriter out = response.getWriter();
  5. String host = request.getParameter("host");
  6. String port = request.getParameter("port");
  7. if (host == null || port == null || host.trim() == ""
  8. || port.trim() == "") {
  9. ServletContext context = getServletContext();
  10. host = context.getInitParameter("hbase-master-ip");
  11. port = context.getInitParameter("hbase-master-port");
  12. }
  13. String phoenixSQL = request.getParameter("sql");
  14. String json = PhoenixClient.execSql(host, port, phoenixSQL);
  15. out.println(json);
  16. out.flush();
  17. out.close();
  18. }

四、使用

所有SQL都需要进行urlencode / encodeURIComponent处理

1、查询xxx表的记录条数

  1. # phoenix sql、做 url encode 处理
  2. $sql = 'select count(1) from "xxx"';
  3. $sql = urlencode($sql);
  4. # 访问下面接口获取数据
  5. $url = 'http://localhost:8080?host=localhost&port=2181&sql=' . $sql ;

返回的数据格式:

  1. {
  2. "data": [
  3. {
  4. "COUNT(1)": "4"
  5. }
  6. ],
  7. "cost": "199 ms"
  8. }

COUNT(1)作为字段名感觉很奇怪,对应的SQL也可以改一下,加个别名,如:

$sql = 'select count(1) as "count" from "xxx"';

得到的结果为:

  1. {
  2. "data": [
  3. {
  4. "count": "4"
  5. }
  6. ],
  7. "cost": "93 ms"
  8. }

2、查询表里的所有数据(结果集太大就别这么玩儿了)

$sql = 'select * from "xxx"';

得到的结果为:

  1. {
  2. "data": [
  3. {
  4. "val3": "ehhhh",
  5. "ROW": "key1",
  6. "val1": "ehhhh",
  7. "val2": "ehhhh"
  8. },
  9. {
  10. "ROW": "key2",
  11. "val1": "hhhhh"
  12. },
  13. {
  14. "ROW": "key3",
  15. "val1": "hhhhh3"
  16. },
  17. {
  18. "ROW": "key4",
  19. "val1": "hhhhh4"
  20. }
  21. ],
  22. "cost": "19 ms"
  23. }

3、只获取某个字段,且进行条件过滤

$sql = 'select ROW,"val1" from "xxx" where "val1"=\'hhhhh4\'';

得到结果集:

  1. {
  2. "data": [
  3. {
  4. "ROW": "key3",
  5. "val1": "hhhhh3"
  6. }
  7. ],
  8. "cost": "24 ms"
  9. }

其他的情况,就不举例了。

五、总结

就完全可以把Phoenix当成MySQL来用,要想速度快,还是建立好索引再使用;在数据量庞大的情况下,有索引和没索引,查询速度是天壤之别的。

如果你也正好在玩儿这个东西,希望对你有帮助。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/953099
推荐阅读
相关标签
  

闽ICP备14008679号