All Projects → nerdammer → Spark Hbase Connector

nerdammer / Spark Hbase Connector

Licence: apache-2.0
Connect Spark to HBase for reading and writing data with ease

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Spark Hbase Connector

Python Bigdata
Data science and Big Data with Python
Stars: ✭ 112 (-62.54%)
Mutual labels:  spark, hbase
Hbase Rdd
Spark RDD to read, write and delete from HBase
Stars: ✭ 277 (-7.36%)
Mutual labels:  spark, hbase
Spring Boot Quick
🌿 基于springboot的快速学习示例,整合自己遇到的开源框架,如:rabbitmq(延迟队列)、Kafka、jpa、redies、oauth2、swagger、jsp、docker、spring-batch、异常处理、日志输出、多模块开发、多环境打包、缓存cache、爬虫、jwt、GraphQL、dubbo、zookeeper和Async等等📌
Stars: ✭ 1,819 (+508.36%)
Mutual labels:  spark, hbase
Repository
个人学习知识库涉及到数据仓库建模、实时计算、大数据、Java、算法等。
Stars: ✭ 92 (-69.23%)
Mutual labels:  spark, hbase
swordfish
Open-source distribute workflow schedule tools, also support streaming task.
Stars: ✭ 35 (-88.29%)
Mutual labels:  spark, hbase
Bigdata Notes
大数据入门指南 ⭐
Stars: ✭ 10,991 (+3575.92%)
Mutual labels:  spark, hbase
Technology Talk
汇总java生态圈常用技术框架、开源中间件,系统架构、数据库、大公司架构案例、常用三方类库、项目管理、线上问题排查、个人成长、思考等知识
Stars: ✭ 12,136 (+3958.86%)
Mutual labels:  spark, hbase
Heracles
High performance HBase / Spark SQL engine
Stars: ✭ 27 (-90.97%)
Mutual labels:  spark, hbase
Gimel
Big Data Processing Framework - Unified Data API or SQL on Any Storage
Stars: ✭ 216 (-27.76%)
Mutual labels:  spark, hbase
Sparkstreaming
💥 🚀 封装sparkstreaming动态调节batch time(有数据就执行计算);🚀 支持运行过程中增删topic;🚀 封装sparkstreaming 1.6 - kafka 010 用以支持 SSL。
Stars: ✭ 179 (-40.13%)
Mutual labels:  spark, hbase
Hadoop cookbook
Cookbook to install Hadoop 2.0+ using Chef
Stars: ✭ 82 (-72.58%)
Mutual labels:  spark, hbase
BigData-News
基于Spark2.2新闻网大数据实时系统项目
Stars: ✭ 36 (-87.96%)
Mutual labels:  spark, hbase
Weblogsanalysissystem
A big data platform for analyzing web access logs
Stars: ✭ 37 (-87.63%)
Mutual labels:  spark, hbase
Flink Learning
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》
Stars: ✭ 11,378 (+3705.35%)
Mutual labels:  spark, hbase
Learning Spark
零基础学习spark,大数据学习
Stars: ✭ 37 (-87.63%)
Mutual labels:  spark, hbase
Gaffer
A large-scale entity and relation database supporting aggregation of properties
Stars: ✭ 1,642 (+449.16%)
Mutual labels:  spark, hbase
Dockerfiles
50+ DockerHub public images for Docker & Kubernetes - Hadoop, Kafka, ZooKeeper, HBase, Cassandra, Solr, SolrCloud, Presto, Apache Drill, Nifi, Spark, Consul, Riak, TeamCity and DevOps tools built on the major Linux distros: Alpine, CentOS, Debian, Fedora, Ubuntu
Stars: ✭ 847 (+183.28%)
Mutual labels:  spark, hbase
Bigdata Interview
🎯 🌟[大数据面试题]分享自己在网络上收集的大数据相关的面试题以及自己的答案总结.目前包含Hadoop/Hive/Spark/Flink/Hbase/Kafka/Zookeeper框架的面试题知识总结
Stars: ✭ 857 (+186.62%)
Mutual labels:  spark, hbase
Bigdata docker
Big Data Ecosystem Docker
Stars: ✭ 161 (-46.15%)
Mutual labels:  spark, hbase
yuzhouwan
Code Library for My Blog
Stars: ✭ 39 (-86.96%)
Mutual labels:  spark, hbase

Spark-HBase Connector

Build status

This library lets your Apache Spark application interact with Apache HBase using a simple and elegant API.

If you want to read and write data to HBase, you don't need using the Hadoop API anymore, you can just use Spark.

Including the library

The spark-hbase-connector is available in Sonatype repository. You can just add the following dependency in sbt:

libraryDependencies += "it.nerdammer.bigdata" % "spark-hbase-connector_2.10" % "1.0.3"

The Maven style version of the dependency is:

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.3</version>
</dependency>

If you don't like sbt or Maven, you can also check out this Github repo and execute the following command from the root folder:

sbt package

SBT will create the library jar under target/scala-2.10.

Note that the library depends on the following artifacts:

libraryDependencies +=  "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"

libraryDependencies +=  "org.apache.hbase" % "hbase-common" % "1.0.3" excludeAll(ExclusionRule(organization = "javax.servlet", name="javax.servlet-api"), ExclusionRule(organization = "org.mortbay.jetty", name="jetty"), ExclusionRule(organization = "org.mortbay.jetty", name="servlet-api-2.5"))

libraryDependencies +=  "org.apache.hbase" % "hbase-client" % "1.0.3" excludeAll(ExclusionRule(organization = "javax.servlet", name="javax.servlet-api"), ExclusionRule(organization = "org.mortbay.jetty", name="jetty"), ExclusionRule(organization = "org.mortbay.jetty", name="servlet-api-2.5"))

libraryDependencies +=  "org.apache.hbase" % "hbase-server" % "1.0.3" excludeAll(ExclusionRule(organization = "javax.servlet", name="javax.servlet-api"), ExclusionRule(organization = "org.mortbay.jetty", name="jetty"), ExclusionRule(organization = "org.mortbay.jetty", name="servlet-api-2.5"))


libraryDependencies +=  "org.scalatest" % "scalatest_2.10" % "2.2.4" % "test"

Check also if the current branch is passing all tests in Travis-CI before checking out (See "build" icon above).

Setting the HBase host

The HBase Zookeeper quorum host can be set in multiple ways.

(1) Passing the host to the spark-submit command:

spark-submit --conf spark.hbase.host=thehost ...

(2) Using the hbase-site.xml file (in the root of your jar, i.e. src/main/resources/hbase-site.xml):

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
	<property>
		<name>hbase.zookeeper.quorum</name>
		<value>thehost</value>
	</property>
	
	<!-- Put any other property here, it will be used -->
</configuration>

(3) If you have access to the JVM parameters:

java -Dspark.hbase.host=thehost -jar ....

(4) Using the scala code:

val sparkConf = new SparkConf()
...
sparkConf.set("spark.hbase.host", "thehost")
...
val sc = new SparkContext(sparkConf)

or you can directly configure with SparkContext:

sparkContext.hadoopConfiguration.set("spark.hbase.host", "thehost")

Writing to HBase (Basic)

Writing to HBase is very easy. Remember to import the implicit conversions:

import it.nerdammer.spark.hbase._

You have just to create a sample RDD, as the following one:

val rdd = sc.parallelize(1 to 100)
            .map(i => (i.toString, i+1, "Hello"))

This rdd is made of tuples like ("1", 2, "Hello") or ("27", 28, "Hello"). The first element of each tuple is considered the row id, the others will be assigned to columns.

rdd.toHBaseTable("mytable")
    .toColumns("column1", "column2")
    .inColumnFamily("mycf")
    .save()

You are done. HBase now contains 100 rows in table mytable, each row containing two values for columns mycf:column1 and mycf:column2.

Reading from HBase (Basic)

Reading from HBase is easier. Remember to import the implicit conversions:

import it.nerdammer.spark.hbase._

If you want to read the data written in the previous example, you just need to write:

val hBaseRDD = sc.hbaseTable[(String, Int, String)]("mytable")
    .select("column1", "column2")
    .inColumnFamily("mycf")

Now hBaseRDD contains all the data found in the table. Each object in the RDD is a tuple containing (in order) the row id, the corresponding value of column1 (Int) and column2 (String).

If you don't want the row id but, you only want to see the columns, just remove the first element from the tuple specs:

val hBaseRDD = sc.hbaseTable[(Int, String)]("mytable")
    .select("column1", "column2")
    .inColumnFamily("mycf")

This way, only the columns that you have chosen will be selected.

you don't have to provide column family name as a prefix to column name for the provided column family at inColumnFamily(COLUMN_FAMILY_NAME) but for other columns you need to provide prefix with :(colon).

val hBaseRDD = sc.hbaseTable[(Int, String, String)]("mytable")
    .select("column1","columnfamily2:column2","columnfamily3:column3")
    .inColumnFamily("columnfamily1")

Other Topics

Filtering

It is possible to filter the results by prefixes of row keys. Filtering also supports additional salting prefixes (see the salting section).

val rdd = sc.hbaseTable[(String, String)]("table")
      .select("col")
      .inColumnFamily(columnFamily)
      .withStartRow("00000")
      .withStopRow("00500")

The example above retrieves all rows having a row key greater or equal to 00000 and lower than 00500. The options withStartRow and withStopRow can also be used separately.

Managing Empty Columns

Empty columns are managed by using Option[T] types:

val rdd = sc.hbaseTable[(Option[String], String)]("table")
      .select("column1", "column2")
      .inColumnFamily(columnFamily)

rdd.foreach(t => {
    if(t._1.nonEmpty) println(t._1.get)
})

You can use the Option[T] type every time you are not sure whether a given column is present in your HBase RDD.

Using different column families

Different column families can be used both when reading or writing an RDD.

data.toHBaseTable("mytable")
      .toColumns("column1", "cf2:column2")
      .inColumnFamily("cf1")
      .save()

In the example above, cf1 refers only to column1, because cf2:column2 is already fully qualified.

val count = sc.hbaseTable[(String, String)]("mytable")
      .select("cf1:column1", "column2")
      inColumnFamily("cf2")
      .count

In the reading example above, the default column family cf2 applies only to column2.

Usage in Spark Streaming

The connector can be used in Spark Streaming applications with the same API.

// stream is a DStream[(Int, Int)]

stream.foreachRDD(rdd =>
    rdd.toHBaseTable("table")
      .inColumnFamily("cf")
      .toColumns("col1")
      .save()
    )

HBaseRDD as Spark Dataframe

You can convert hBaseRDD to Spark Dataframe for further Spark Transformations and moving to any other NoSQL Databases such as (MongoDB, Hive etc).

val hBaseRDD = sparkContext.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String])](HBASE_TABLE_NAME).select("column1", "column2","column3","column4","column5").inColumnFamily(HBASE_COLUMN_FAMILY)

Iterating hBaseRDD to create scala org.apache.spark.rdd.RDD [org.apache.spark.sql.Row] (i.e RDD[Row]) in our example.

val rowRDD = hBaseRDD.map(i => Row(i._1.get,i._2.get,i._3.get,i._4.get,i._5.get))

Creating schema structure for above SparkRDD[Row]

object myschema {
      val column1 = StructField("column1", StringType)
      val column2 = StructField("column2", StringType)
      val column3 = StructField("column2", StringType)
      val column4 = StructField("column2", StringType)
      val column5 = StructField("column2", StringType)
      val struct = StructType(Array(column1,column2,column3,column4,column5))
    }

Create Spark Dataframe with RDD[Row] and Schema Structure

val myDf = sqlContext.createDataFrame(rowRDD,myschema.struct)

Now you can apply any spark transformations and actions, for example.

myDF.show()

It will show you Dataframe's Data in tabular structure.

SparkSQL on HBase

Hence, with previous example. you have converted hBaseRDD to appropriate Spark Dataframe you can apply SparkSQL on Dataframe.

Creating temporary table in spark.

myDF.registerTempTable("mytable")

Applying SparkSQL on created temporary table.

sqlContext.sql("SELECT * FROM mytable").show()

Advanced

Salting Prefixes

Salting is supported in reads and writes. Only string valued row id are supported at the moment, so salting prefixes should also be of String type.

sc.parallelize(1 to 1000)
      .map(i => (pad(i.toString, 5), "A value"))
      .toHBaseTable(table)
      .inColumnFamily(columnFamily)
      .toColumns("col")
      .withSalting((0 to 9).map(s => s.toString))
      .save()

In the example above, each row id is composed of 5 digits: from 00001 to 01000. The salting property adds a random digit in front, so you will have records like: 800001, 600031, ...

When reading the RDD, you have just to declare the salting type used in the table and ignore it when using bounds (startRow or stopRow). The library takes care of dealing with salting.

val rdd = sc.hbaseTable[String](table)
      .select("col")
      .inColumnFamily(columnFamily)
      .withStartRow("00501")
      .withSalting((0 to 9).map(s => s.toString))

Custom Mapping with Case Classes

Custom mapping can be used in place of the default tuple-mapping technique. Just define a case class for your type:

case class MyData(id: Int, prg: Int, name: String)

and define an object that contains implicit writer and reader for your type

implicit def myDataWriter: FieldWriter[MyData] = new FieldWriter[MyData] {
    override def map(data: MyData): HBaseData =
      Seq(
        Some(Bytes.toBytes(data.id)),
        Some(Bytes.toBytes(data.prg)),
        Some(Bytes.toBytes(data.name))
      )

    override def columns = Seq("prg", "name")
}

Do not forget to override the columns method.

Then, you can define an implicit reader:

implicit def myDataReader: FieldReader[MyData] = new FieldReader[MyData] {
    override def map(data: HBaseData): MyData = MyData(
      id = Bytes.toInt(data.head.get),
      prg = Bytes.toInt(data.drop(1).head.get),
      name = Bytes.toString(data.drop(2).head.get)
    )

    override def columns = Seq("prg", "name")
}

Once you have done, make sure that the implicits are imported and that it does not produce a non-serializable task (Spark will check it at runtime).

You can now use your converters easily:

val data = sc.parallelize(1 to 100).map(i => new MyData(i, i, "Name" + i.toString))
// data is an RDD[MyData]

data.toHBaseTable("mytable")
  .inColumnFamily("mycf")
  .save()

val read = sc.hbaseTable[MyData]("mytable")
  .inColumnFamily("mycf")

The converters above are low level and use directly the HBase API. Since this connector provides you with many predefined converters for simple and complex types, probably you would like to reuse them. The new FieldReaderProxy and FieldWriterProxy API has been created for this purpose.

High-level converters using FieldWriterProxy

You can create a new FieldWriterProxy by declaring a conversion from your custom type to a predefined type. In this case, the predefined type it is a tuple composed of three basic fields:

// MySimpleData is a case class

implicit def myDataWriter: FieldWriter[MySimpleData] = new FieldWriterProxy[MySimpleData, (Int, Int, String)] {

  override def convert(data: MySimpleData) = (data.id, data.prg, data.name) // the first element is the row id

  override def columns = Seq("prg", "name")
}

The corresponding FieldReaderProxy converts back a tuple of three basic fields into objects of class MySimpleData:

implicit def myDataReader: FieldReader[MySimpleData] = new FieldReaderProxy[(Int, Int, String), MySimpleData] {

  override def columns = Seq("prg", "name")

  override def convert(data: (Int, Int, String)) = MySimpleData(data._1, data._2, data._3)
}

Note that we have not used the HBase API. Currently, FieldWriterProxy can read and write tuples up to 22 fields (including the row id).

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].