All Projects → autovia → ros_hadoop

autovia / ros_hadoop

Licence: Apache-2.0 license
Hadoop splittable InputFormat for ROS. Process rosbag with Hadoop Spark and other HDFS compatible systems.

Programming Languages

scala
5932 projects
Dockerfile
14818 projects
python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to ros hadoop

fsbrowser
Fast desktop client for Hadoop Distributed File System
Stars: ✭ 27 (-70.65%)
Mutual labels:  hadoop, hdfs
teraslice
Scalable data processing pipelines in JavaScript
Stars: ✭ 48 (-47.83%)
Mutual labels:  hadoop, hdfs
Bigdata docker
Big Data Ecosystem Docker
Stars: ✭ 161 (+75%)
Mutual labels:  hadoop, hdfs
Hdfs Shell
HDFS Shell is a HDFS manipulation tool to work with functions integrated in Hadoop DFS
Stars: ✭ 117 (+27.17%)
Mutual labels:  hadoop, hdfs
skein
A tool and library for easily deploying applications on Apache YARN
Stars: ✭ 128 (+39.13%)
Mutual labels:  hadoop, hdfs
Dynamometer
A tool for scale and performance testing of HDFS with a specific focus on the NameNode.
Stars: ✭ 122 (+32.61%)
Mutual labels:  hadoop, hdfs
kafka-connect-fs
Kafka Connect FileSystem Connector
Stars: ✭ 107 (+16.3%)
Mutual labels:  hadoop, hdfs
Wifi
基于wifi抓取信息的大数据查询分析系统
Stars: ✭ 93 (+1.09%)
Mutual labels:  hadoop, hdfs
hive to es
同步Hive数据仓库数据到Elasticsearch的小工具
Stars: ✭ 21 (-77.17%)
Mutual labels:  hadoop, hdfs
HDFS-Netdisc
基于Hadoop的分布式云存储系统 🌴
Stars: ✭ 56 (-39.13%)
Mutual labels:  hadoop, hdfs
Ibis
A pandas-like deferred expression system, with first-class SQL support
Stars: ✭ 1,630 (+1671.74%)
Mutual labels:  hadoop, hdfs
datasqueeze
Hadoop utility to compact small files
Stars: ✭ 18 (-80.43%)
Mutual labels:  hadoop, hdfs
Bigdata Notes
大数据入门指南 ⭐
Stars: ✭ 10,991 (+11846.74%)
Mutual labels:  hadoop, hdfs
Spark With Python
Fundamentals of Spark with Python (using PySpark), code examples
Stars: ✭ 150 (+63.04%)
Mutual labels:  hadoop, hdfs
Repository
个人学习知识库涉及到数据仓库建模、实时计算、大数据、Java、算法等。
Stars: ✭ 92 (+0%)
Mutual labels:  hadoop, hdfs
docker-hadoop
Docker image for main Apache Hadoop components (Yarn/Hdfs)
Stars: ✭ 59 (-35.87%)
Mutual labels:  hadoop, hdfs
Learning Spark
零基础学习spark,大数据学习
Stars: ✭ 37 (-59.78%)
Mutual labels:  hadoop, hdfs
Camus
Mirror of Linkedin's Camus
Stars: ✭ 81 (-11.96%)
Mutual labels:  hadoop, hdfs
bigdata-doc
大数据学习笔记,学习路线,技术案例整理。
Stars: ✭ 37 (-59.78%)
Mutual labels:  hadoop, hdfs
wasp
WASP is a framework to build complex real time big data applications. It relies on a kind of Kappa/Lambda architecture mainly leveraging Kafka and Spark. If you need to ingest huge amount of heterogeneous data and analyze them through complex pipelines, this is the framework for you.
Stars: ✭ 19 (-79.35%)
Mutual labels:  hadoop, hdfs

ros_hadoop / RosbagInputFormat

Large-scale data processing for ROS using Hadoop, Spark, TensorFlow and more.

RosbagInputFormat is an open source splittable Hadoop InputFormat for the ROS bag file format.

The complete source code is available in src/ folder and the jar file is generated using SBT (see build.sbt)

Workflow

  1. Download latest release jar file and put it in classpath
  2. Extract the index configuration of your ROS bag file. The extracted index is a very very small configuration file containing a protobuf array that will be given in the job configuration. Note that the operation will not process and it will not parse the whole bag file, but will simply seek to the required offset. e.g.
java -jar lib/rosbaginputformat.jar -f /opt/ros_hadoop/master/dist/HMB_4.bag
# will create an idx.bin config file /opt/ros_hadoop/master/dist/HMB_4.bag.idx.bin
  1. Put the ROS bag file in HDFS e.g.
hdfs dfs -put
  1. Use it in your Spark jobs e.g.
sc.newAPIHadoopFile(
    path =             "hdfs://127.0.0.1:9000/user/spark/HMB_4.bag",
    inputFormatClass = "io.autovia.foss.RosbagMapInputFormat",
    keyClass =         "org.apache.hadoop.io.LongWritable",
    valueClass =       "org.apache.hadoop.io.MapWritable",
    conf = {"RosbagInputFormat.chunkIdx":"/opt/ros_hadoop/master/dist/HMB_4.bag.idx.bin"})

Example data can be found for instance at https://github.com/udacity/self-driving-car/tree/master/datasets published under MIT License.

Want to Upgrade?

We also sell ros_spark and ros_go which provide more features, a commercial-friendly license and allow you to support high quality open source development all at the same time. Please see the Autovia homepage for more detail.

Documentation

The doc/ folder contains a jupyter notebook with a few basic usage examples.

Tutorial

To test locally use the Dockerfile

To build an image using the Dockerfile run the following in the shell. Please note that it will download Hadoop and Spark from the URL source. The generated image is therefore relatively large ~5G.

docker build -t ros_hadoop:latest -f Dockerfile .

To start a container use the following shell command in the ros_hadoop folder.

# $(pwd) will point to the ros_hadoop git clone folder
docker run -it -v $(pwd):/root/ros_hadoop -p 8888:8888 ros_hadoop

The container has a configured HDFS as well as Spark and the RosInputFormat jar. It leaves the user in a bash shell.

Point your browser to the local URL and enjoy the tutorial. The access token is printed in the docker container console.

Usage from Spark (pyspark)

Example data can be found for instance at https://github.com/udacity/self-driving-car/tree/master/datasets published under MIT License.

Check that the Rosbag file version is V2.0

java -jar lib/rosbaginputformat.jar --version -f /opt/ros_hadoop/master/dist/HMB_4.bag

Extract the index as configuration

The index is a very very small configuration file containing a protobuf array that will be given in the job configuration. Note that the operation will not process and it will not parse the whole bag file, but will simply seek to the required offset.

# assuming you start the notebook in the doc/ folder
java -jar ../lib/rosbaginputformat.jar \
     -f /opt/ros_hadoop/master/dist/HMB_4.bag

hdfs dfs -ls

This will generate a very small file named HMB_4.bag.idx.bin in the same folder.

Copy the bag file in HDFS

Using your favorite tool put the bag file in your working HDFS folder.

Note: keep the index file as configuration to your jobs, do not put small files in HDFS. For convenience we already provide an example file (/opt/ros_hadoop/master/dist/HMB_4.bag) in the HDFS under /user/root/

hdfs dfs -put /opt/ros_hadoop/master/dist/HMB_4.bag
hdfs dfs -ls

  • Hadoop InputFormat and Record Reader for Rosbag
  • Process Rosbag with Spark, Yarn, MapReduce, Hadoop Streaming API, …
  • Spark RDD are cached and optimised for analysis

Process the ROS bag file in Spark using the RosbagInputFormat

Note: your HDFS address might differ.

fin = sc.newAPIHadoopFile(
    path =             "hdfs://127.0.0.1:9000/user/root/HMB_4.bag",
    inputFormatClass = "io.autovia.foss.RosbagMapInputFormat",
    keyClass =         "org.apache.hadoop.io.LongWritable",
    valueClass =       "org.apache.hadoop.io.MapWritable",
    conf = {“RosbagInputFormat.chunkIdx”:”/opt/ros_hadoop/master/dist/HMB_4.bag.idx.bin"})

Interpret the Messages

To interpret the messages we need the connections. We could get the connections as configuration as well. At the moment we decided to collect the connections into Spark driver in a dictionary and use it in the subsequent RDD actions.

Collect the connections from all Spark partitions of the bag file into the Spark driver

conn_a = fin.filter(
			lambda r: r[1]['header']['op'] == 7
		).map(
			lambda r: r[1]
		).collect()
conn_d = {str(k['header']['topic']):k for k in conn_a}

# see topic names
conn_d.keys()

From all ROS bag splits we collect into Spark driver the connection messages (op=7 in header) where the ROS definitions are stored. This operation happens in parallel of course.

Load the python map functions from src/main/python/functions.py

%run -i ../src/main/python/functions.py

At the moment the file contains a single mapper function named msg_map.

Use of msg_map to apply a function on all messages

Python rosbag.bag needs to be installed on all Spark workers. The msg_map function (from src/main/python/functions.py) takes three arguments: 1. r = the message or RDD record Tuple 2. func = a function (default str) to apply to the ROS message 3. conn = a connection to specify what topic to process

%matplotlib nbagg
# use %matplotlib notebook in python3
from functools import partial
import pandas as pd
import numpy as np


# Take messages from '/imu/data' topic using default str func
rdd = fin.flatMap(
    partial(msg_map, conn=conn_d['/imu/data'])
)

The connection dictionary is sent over the closure to the workers that uses it in the msg_map.

print(rdd.take(1)[0])
header:
  seq: 1701626
  stamp:
    secs: 1479425728
    nsecs: 747487068
  frame_id: /imu
orientation:
  x: -0.0251433756238
  y: 0.0284643176884
  z: -0.0936542998233
  w: 0.994880191333
orientation_covariance: [0.017453292519943295, 0.0, 0.0, 0.0, 0.017453292519943295, 0.0, 0.0, 0.0, 0.15707963267948966]
angular_velocity:
  x: 0.0
  y: 0.0
  z: 0.0
angular_velocity_covariance: [-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0]
linear_acceleration:
  x: 1.16041922569
  y: 0.595418334007
  z: 10.7565326691
linear_acceleration_covariance: [0.0004, 0.0, 0.0, 0.0, 0.0004, 0.0, 0.0, 0.0, 0.0004]

Image data from camera messages

An example of taking messages using a func other than default str. In our case we apply a lambda to messages from from '/center_camera/image_color/compressed' topic. As usual with Spark the operation will happen in parallel on all workers.

from PIL import Image
from io import BytesIO

res = fin.flatMap(
    partial(msg_map, func=lambda r: r.data, conn=conn_d['/center_camera/image_color/compressed'])
).take(50)

Image.open(BytesIO(res[48]))

Plot fuel level

The topic /vehicle/fuel_level_report contains 2215 ROS messages. Let us plot the header.stamp in seconds vs. fuel_level using a pandas dataframe.

def f(msg):
    return (msg.header.stamp.secs, msg.fuel_level)

d = fin.flatMap(
    partial(msg_map, func=f, conn=conn_d['/vehicle/fuel_level_report'])
).toDF().toPandas()

d.set_index(‘_1').plot()

Machine Learning models on Spark workers

A dot product Keras "model" for each message from a topic. We will compare it with the one computed with numpy.

Note that the imports happen in the workers and not in driver. On the other hand the connection dictionary is sent over the closure.

def f(msg):
    from keras.layers import dot, Dot, Input
    from keras.models import Model

    linear_acceleration = {
        'x': msg.linear_acceleration.x,
        'y': msg.linear_acceleration.y,
        'z': msg.linear_acceleration.z,
    }

    linear_acceleration_covariance = np.array(msg.linear_acceleration_covariance)

    i1 = Input(shape=(3,))
    i2 = Input(shape=(3,))
    o = dot([i1,i2], axes=1)

    model = Model([i1,i2], o)

    # return a tuple with (numpy dot product, keras dot "predict")
    return (
        np.dot(linear_acceleration_covariance.reshape(3,3),
               [linear_acceleration['x'], linear_acceleration['y'], linear_acceleration['z']]),
        model.predict([
            np.array([[ linear_acceleration['x'], linear_acceleration['y'], linear_acceleration['z'] ]]),
            linear_acceleration_covariance.reshape((3,3))])
    )

fin.flatMap(partial(msg_map, func=f, conn=conn_d['/vehicle/imu/data_raw'])).take(5)

# tuple with (numpy dot product, keras dot “predict”)

One can sample of course and collect the data in the driver to train a model on one single machine. Note that the msg is the most granular unit but you could replace the flatMap with a mapPartitions to apply such a Keras function to a whole split.

Another option would be to have a map.reduceByKey before the flatMap so that the function argument would be a whole interval instead of a msg. The idea is to key on time.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].