一、创建一个HBase表
1、登录Linux系统,打开一个终端,启动HBase。
2、在HBase数据库中创建student表,启动hbase shell,用list命令显示当前HBase数据库中有哪些已经创建好的表:
hbase> list
如果里面有一个名为student的表,使用以下命令删除:
hbase> disable 'student'
hbase> drop 'student'
3、确认无student表后,使用以下命令创建
hbase> create 'student','info'
在创建student表的create命令中,命令后面首先跟上表名称 'student',然后,再跟上列族名称 'info',这个列族 'info'中包含三个列 'name', 'gender', 'age'。会发现没有 'id '字段,这是因为HBase的表中会有一个系统默认的属性作为行键,无需自行创建,默认把put命令操作中跟在表名后的第一个字段作为行健。
4、HBase中用put命令添加数据:
//第一个学生记录
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
//第二个学生记录
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'
5、数据录入结束后,可以用下面的命令查看已经录入的数据:
Hbase> scan 'student'
可以得到如下结果:
二、配置Spark
在开始编程操作HBase数据库之前,需要做一些准备工作。
新建一个终端,执行下面命令,把HBase的lib目录下的一些jar文件拷贝到Spark中,这些都是编程时需要引入的jar包,需要拷贝的jar文件包括:所有hbase开头的jar文件、guava-12.0.1.jar、
htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar:
$ cd /mnt/data/spark-3.0.1-bin-hadoop2.7/jars
$ mkdir hbase
$ cd hbase
$ cp /mnt/data/hbase-2.2.5/lib/hbase*.jar ./
$ cp /mnt/data/hbase-2.2.5/lib/guava-11.0.2.jar ./
$ cp /mnt/data/hbase-2.2.5/lib/htrace-core-3.1.0-incubating.jar ./
$ cp /mnt/data/hbase-2.2.5/lib/protobuf-java-2.5.0.jar ./
需要注意:在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包,需要我们另行下载。打开
https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001下载jar包,上传至/mnt/data/spark-3.0.1-bin-hadoop2.7/jars/hbase目录下。
然后,使用vim编辑器打开spark-env.sh文件,设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:
$ cd /mnt/data/spark-3.0.1-bin-hadoop2.7/conf
$ vim spark-env.sh
打开spark-env.sh文件以后,在文件最前面增加下面一行内容:
export SPARK_DIST_CLASSPATH=$(/mnt/data/hbase-2.2.5/bin/hbase classpath):/mnt/data/spark-3.0.1-bin-hadoop2.7/jars/hbase/*
只有这样,后面的编译和运行过程才不会出错。
三、编写程序读取HBase数据
如果要让Spark读取HBase,就需要使用SparkContext提供的newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中。
在
/mnt/data/spark-3.0.1-bin-hadoop2.7目录下新建一个test文件夹,在文件夹中新建一个test的文本文档,然后输入如下代码,保存后讲test.txt修改后缀为test.py:
from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.getOrCreate()
sc = spark.sparkContext
host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
在
/mnt/data/spark-3.0.1-bin-hadoop2.7/目录下,终端中输入命令执行spark-sumbit任务运行test.py文件:
[root@big-data spark-3.0.1-bin-hadoop2.7]# ./bin/spark-submit \
> --master local[8] \
> /mnt/data/spark-3.0.1-bin-hadoop2.7/test/test.py
执行后可以查看到HBase数据已被成功读取。
热门跟贴