Big Data Analysis with Hadoop, Spark, and R Shiny

Andrew (he/him) - Nov 21 '18 - - Dev Community

The Apache Software Foundation's Hadoop and Spark projects comprise the core of what is probably the most popular open-source Big Data analysis pipeline today. Hadoop's distributed file system, HDFS, breaks files into chunks and replicates those chunks across commodity hardware, resulting in a cheap, scalable, fault-tolerant data storage solution. Spark can then analyse those data chunks in-place, reducing network and I/O latency, with a speedup of 100x over Hadoop's built-in analysis framework, MapReduce.

But sometimes, you want to present a nice graphical user interface (GUI) so users who aren't as tech savvy can still access and analyse your Big Data stores. There are all sorts of applications for this -- real-time data analysis and modelling of manufacturing processes; discovering daily, weekly, or seasonal trends in consumer or user data; or analyzing scientific data to make live interactive plots using plotly. In this post, I'll show you how to read and analyze distributed data stored on HDFS using Spark, and present those results in R.

(Note: This post assumes that you have HDFS / Spark up and running on your machine. If not, check out my previous post on Installing and Running Hadoop and Spark on Windows.)

The SparkR Shell

Getting R to talk to HDFS through Spark is very easy with the SparkR shell. From the command like, just run:

C:\Users\andrew>sparkR

R version 3.5.1 (2018-07-02) -- "Feather Spray"
Copyright (C) 2018 The R Foundation for Statistical Computing
Platform: x86_64-w64-mingw32/x64 (64-bit)

...

Spark package found in SPARK_HOME: C:\Spark\spark-2.3.2-bin-hadoop2.7
Launching java with spark-submit command C:\Spark\spark-2.3.2-bin-hadoop2.7/bin/spark-submit2.cmd   "sparkr-shell"  C:\Users\andrew\AppData\Local\Temp\RtmpuMdeVg\backend_port2664d055020
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

 Welcome to
    ____              __
   / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  '_/
 /___/ .__/\_,_/_/ /_/\_\   version  2.3.2
    /_/


 SparkSession available as 'spark'.
>
Enter fullscreen mode Exit fullscreen mode

This is an R shell with access to the HDFS. Try running some R commands:

> x <- c(1,2,3)
> x
[1] 1 2 3
Enter fullscreen mode Exit fullscreen mode

Text Files

If your HDFS is set up correctly, you should be able to access plain text files right away using the read.df() method:

> txt <- read.df("hdfs://localhost:9000/example_data/example_text.md", "text")
> txt
SparkDataFrame[value:string]
> head(txt)
                                                                           value
1                                                                 # Apache Spark
2
3 Spark is a fast and general cluster computing system for Big Data. It provides
4    high-level APIs in Scala, Java, Python, and R, and an optimized engine that
5      supports general computation graphs for data analysis. It also supports a
6     rich set of higher-level tools including Spark SQL for SQL and DataFrames,
>
Enter fullscreen mode Exit fullscreen mode

Note that this file is located on my HDFS at /example_data/example_text.md. This is confirmed by running hadoop fs -ls at the cmd prompt:

C:\Users\andrew>hadoop fs -ls /example_data
Found 5 items
-rw-r--r--   1 andrew supergroup       2848 2018-11-19 16:23 /example_data/example_csv.csv
-rw-r--r--   1 andrew supergroup       7882 2018-11-19 16:23 /example_data/example_sql_linux.txt
-rw-r--r--   1 andrew supergroup      13958 2018-11-19 16:23 /example_data/example_sql_windows.txt
-rw-r--r--   1 andrew supergroup       3809 2018-11-16 16:35 /example_data/example_text.md
-rw-r--r--   1 andrew supergroup      14381 2018-11-19 16:24 /example_data/example_xlsx.xlsx
Enter fullscreen mode Exit fullscreen mode

The hostname (localhost for me) and port number (9000 for me) are specific to your HDFS setup, but these are the current default settings for a standalone installation.

CSV Files

CSV files can be read with read.df() as well, but you have to set source equal to "csv", rather than "text":

> csv <- read.df("hdfs://localhost:9000/example_data/example_csv.csv", "csv")
> csv
SparkDataFrame[_c0:string, _c1:string, _c2:string, _c3:string, _c4:string]
> head(csv)
                  _c0               _c1               _c2                _c3
1                Time       dissolvedO2                pH        Temperature
2 2018-01-01 15:00:10 49.56497432219166 7.056500932431841 36.952017501071516
3 2018-01-01 15:00:40 49.04355394077128 7.056606732537641  36.94562695468097
...
Enter fullscreen mode Exit fullscreen mode

Notice that we have column headers, above, that weren't interpreted correctly. To fix this, there's another option for read.df(), called header, which we need to set to TRUE (or just T for short):

> csv <- read.df("hdfs://localhost:9000/example_data/example_csv.csv", "csv", header=T)
> csv
SparkDataFrame[Time:string, dissolvedO2:string, pH:string, Temperature:string, AgitatorSpeed:string]
> head(csv)
                 Time       dissolvedO2                 pH        Temperature
1 2018-01-01 15:00:10 49.56497432219166  7.056500932431841 36.952017501071516
2 2018-01-01 15:00:40 49.04355394077128  7.056606732537641  36.94562695468097
3 2018-01-01 15:01:10  49.7866879539053  7.056590932521841  36.98149607055008
...
>
Enter fullscreen mode Exit fullscreen mode

Also, notice that both csv and txt are SparkDataFrame objects. A SparkDataFrame is a distributed collection of data which SparkR can access and analyse in its distributed form across the HDFS cluster. This means that the data isn't read into the R session, rather, it's treated exactly the way it would be if you ran Scala or Java commands in the Spark shell itself.

SQL Tables

Importing SQL Tables into HDFS

In a previous post, I explained how to copy data files from the local filesystem into HDFS using the hadoop fs -put command, but importing SQL tables is a bit more involved. I'm going to start by assuming that you have MySQL set up and you have a table available in some database:

C:\Users\andrew>mysql -u root -p
Enter password: ***********
...

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| siemens            |
| sys                |
+--------------------+
5 rows in set (0.00 sec)

mysql> use siemens;
Database changed
mysql> show tables;
+-------------------+
| Tables_in_siemens |
+-------------------+
| simulate          |
+-------------------+
1 row in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

We're going to use Apache Sqoop to import the simulate table into HDFS:

  1. Download the most recent version of Sqoop. (I downloaded the binary *.tar.gz file.) Unpack the file and move it into C:\Sqoop.
  2. Update your system environment variables and add SQOOP_HOME as the directory which you just unpacked into C:\Sqoop (it should be something like C:\Sqoop\sqoop-1.4.7.bin__hadoop-2.6.0):

    And add Sqoop to your %PATH% by appending %SQOOP_HOME%\bin:

  3. Verify the installation by opening a new cmd window and typing

C:\Users\andrew>sqoop version
...
2018-11-21 13:20:29,408 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
Sqoop 1.4.7
git commit id 2328971411f57f0cb683dfb79d19d4d19d185dd8
Compiled by maugli on Thu Dec 21 15:59:58 STD 2017
Enter fullscreen mode Exit fullscreen mode

Great! Now, we need to make sure that Sqoop can talk to MySQL through the Java Database Connectivity API (JDBC). To do that, we need to download Connector/J, the official JDBC driver for MySQL. I downloaded the platform-independent *.zip file (currently mysql-connector-java-5.1.47.zip) and unzipped it into C:\Program Files\MySQL\ for safekeeping. Then, you'll need to copy C:\Program Files\MySQL\mysql-connector-java-5.1.47\mysql-connector-java-5.1.47.jar into C:\Sqoop\sqoop-1.4.7.bin__hadoop-2.6.0\lib\ (making sure to account for version numbers, which might be different).

The final thing to do is to download the Cloudera Sqoop Java library. (I got mine from here.) This should be named something like sqoop-1.4.2-hadoop20.jar. Put this jar into C:\Spark\spark-2.3.2-bin-hadoop2.7\jars. Then, open a cmd window with Administrator permissions. You should now be able to import an SQL table into HDFS.

I'm going to import this SQL table as a Parquet File, but there are other options available, including importing as SequenceFiles, Avro Data files, or just as plain text. The command to import is rather long:

C:\Users\andrew> sqoop import --connect "jdbc:mysql://localhost:3306/Siemens" --table "simulate" --username "root" --password "<password here>" -m 1 --target-dir "hdfs://localhost:9000/big_data/example_SQL" --as-parquetfile --bindir sqoop_jars
Enter fullscreen mode Exit fullscreen mode

Note that my MySQL database is also hosted on localhost and the port is the default MySQL port number, 3306. The database in my case is called Siemens and the table, as noted above, is simulate. I connect to MySQL as root, type out my MySQL password, set -m 1 (the number of "map" tasks for a parallel import). I want the database to show up under /big_data/example_SQL in my HDFS as a Parquet File, and I want any generated *.class or *.jar files to be dumped to C:\Users\andrew\sqoop_jars.

Note that you can also use an "options" file to pass arguments to sqoop import so you don't have to write out your password as plaintext to the terminal (which is unsafe).

We can verify that the table was correctly imported to HDFS with the following command:

    C:\Users\andrew>hadoop fs -ls -R /big_data
    ...
    drwxr-xr-x   - andrew supergroup          0 2018-11-19 17:46 /big_data/example_SQL
    drwxr-xr-x   - andrew supergroup          0 2018-11-19 17:46 /big_data/example_SQL/.metadata
    -rw-r--r--   1 andrew supergroup        174 2018-11-19 17:46 /big_data/example_SQL/.metadata/descriptor.properties
    -rw-r--r--   1 andrew supergroup       2350 2018-11-19 17:46 /big_data/example_SQL/.metadata/schema.avsc
    drwxr-xr-x   - andrew supergroup          0 2018-11-19 17:46 /big_data/example_SQL/.metadata/schemas
    -rw-r--r--   1 andrew supergroup       2350 2018-11-19 17:46 /big_data/example_SQL/.metadata/schemas/1.avsc
    drwxr-xr-x   - andrew supergroup          0 2018-11-19 17:46 /big_data/example_SQL/.signals
    -rw-r--r--   1 andrew supergroup          0 2018-11-19 17:46 /big_data/example_SQL/.signals/unbounded
    -rw-r--r--   1 andrew supergroup     118032 2018-11-19 17:46 /big_data/example_SQL/684c9b31-892c-48c5-8574-9dd61a9d7e78.parquet
Enter fullscreen mode Exit fullscreen mode

Beautiful! Finally, we can read this SQL table from HDFS. Note that importing as a Parquet File means that we have schemas. This is one advantage over importing as some other file type -- Parquet Files maintain a database schema with data types and so on, so we don't need to re-parse these later, which saves time.

Reading SQL Tables from HDFS

Assuming that you've got your SQL table into HDFS as outlined in the previous subsection, we can now read it from the SparkR shell. Simply run:

C:\Users\andrew>sparkR
...

> sql <- read.df("hdfs://localhost:9000/big_data/example_SQL/684c9b31-892c-48c5-8574-9dd61a9d7e78.parquet", "parquet")
> sql
SparkDataFrame[Time:bigint, dissolvedO2:double, pH:double, outletN2:double, outletCO2:double, outletO2:double, outletAR:double, AgitatorSpeed:double, Temperature:double, Mannose:double, Osmolality:double, Yield:double, Afuco_glycans:double, Galacto_glycans:double, Viability:double]
> head(sql)
          Time dissolvedO2       pH outletN2 outletCO2 outletO2  outletAR
1 1.514819e+12    49.56497 7.056501 60.14372  10.32145 28.80335 0.7314709
2 1.514819e+12    49.26843 7.056541 60.13668  10.32402 28.82944 0.7305011
3 1.514819e+12    49.50221 7.056421 60.09855  10.31529 28.77520 0.7297820
...
>
Enter fullscreen mode Exit fullscreen mode

This again works exactly the same way as it did for plain text files and CSV files, only we had to specify source="parquet".

Any Other Kind of File

Literally any other kind of file can also be read from HDFS using the R package curl. I found myself wanting to read Excel files from HDFS. Here's now I did it. First, include whatever libraries you need to read the files, as well as the curl library:

> library(curl)
> library(xlsx)
Enter fullscreen mode Exit fullscreen mode

Fetch the file using http://. Note that your port here is different than the one used above for hdfs://, and is actually the same port which can be used in the browser to manage the HDFS cluster:

Any file accessed on the HDFS through http:// must have /webhdfs/v1 appended to it. So, for example, /example_data/example_xlsx.xlsx would become /webhdfs/v1/example_data/example_xlsx.xlsx. Finally, we need to append ?op=OPEN to the end of the URI so curl knows that we want to open the file and read it:

> curlfile <- curl_fetch_memory("http://localhost:9870/webhdfs/v1/example_data/example_xlsx.xlsx?op=OPEN")
> tmpfile <- tempfile("name_of_temp_file")
> tt <- file(tmpfile, "wb")
> writeBin(curlfile$content, tt)
> close(tt)
Enter fullscreen mode Exit fullscreen mode

And that's it! We now have a temporary file (on Windows, these are saved in some subdirectory of C:\tmp) which we can read using the appropriate reader:

> xlsx <- createDataFrame(read.xlsx(tmpfile, sheetIndex=1))
> xlsx
SparkDataFrame[Time:timestamp, dissolvedO2:double, pH:double, outletN2:double, outletCO2:double, outletO2:double, outletAR:double, AgitatorSpeed:double, Temperature:double, Mannose:double, Osmolality:double, Yield:double, Afuco_glycans:double, Galacto_glycans:double, Viability:double]
> head(xlsx)
                 Time dissolvedO2       pH outletN2 outletCO2 outletO2
1 2018-01-01 15:00:00    49.56497 7.056501 60.14372  10.32145 28.80335
2 2018-01-01 15:00:30    49.04355 7.056607 60.21205  10.38589 28.80089
3 2018-01-01 15:01:00    49.78669 7.056591 60.24343  10.37360 28.84372
Enter fullscreen mode Exit fullscreen mode

Again, notice that both the SQL Parquet File and this Excel file are imported as SparkDataFrame objects! (Note: to the best of my knowledge, there doesn't seem to be a way to read Excel files into SparkR directly from HDFS. They must be downloaded as temporary files to the local filesystem. But assuming that most of your data is in CSV, text, or SQL-as-Parquet Files, you can still distribute most of your analysis tasks.)

Spark in RStudio

You can also execute any of the above commands in the plain R shell or even RStudio. All you have to do is tell R where the SparkR library is located. This should be in your SPARK_HOME directory. For me, this is:

C:\Users\andrew>echo %SPARK_HOME%
C:\Spark\spark-2.3.2-bin-hadoop2.7
Enter fullscreen mode Exit fullscreen mode

So I can open the plain R shell and get my SQL table from above with:

C:\Users\andrew>R --no-restore --no-save
...

> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
...

> sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory="4g"))
Spark package found in SPARK_HOME: C:\Spark\spark-2.3.2-bin-hadoop2.7
Launching java with spark-submit command C:\Spark\spark-2.3.2-bin-hadoop2.7/bin/spark-submit2.cmd   --driver-memory "64g" sparkr-shell
...

> sql <- read.df("hdfs://localhost:9000/big_data/example_SQL/684c9b31-892c-48c5-8574-9dd61a9d7e78.parquet", "parquet")

> sql
SparkDataFrame[Time:bigint, dissolvedO2:double, pH:double, outletN2:double, outletCO2:double, outletO2:double, outletAR:double, AgitatorSpeed:double, Temperature:double, Mannose:double, Osmolality:double, Yield:double, Afuco_glycans:double, Galacto_glycans:double, Viability:double]

> head(sql)
          Time dissolvedO2       pH outletN2 outletCO2 outletO2  outletAR
1 1.514819e+12    49.56497 7.056501 60.14372  10.32145 28.80335 0.7314709
2 1.514819e+12    49.26843 7.056541 60.13668  10.32402 28.82944 0.7305011
3 1.514819e+12    49.50221 7.056421 60.09855  10.31529 28.77520 0.7297820
Enter fullscreen mode Exit fullscreen mode

It's exactly the same! So how can we use this to help build a data analysis GUI in R?

R Shiny

Let's set up a simple R Shiny app to connect to this database. Create two files called server.R and ui.R and paste the following code into them:

server.R

library(shiny)

# Connect to Spark outside the shinyServer() method
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory="4g"))

# Define server logic
shinyServer(function(input, output) {

  output$myTable <- renderDT(

    # get the SparkDataFrame from HDFS; collect() converts SparkDataFrame -> data.frame
    collect(read.df(paste("hdfs://localhost:9000/", input$path, sep=""), input$type))

  )

})
Enter fullscreen mode Exit fullscreen mode

ui.R

library(shiny)
library(DT)

# Define UI for application that draws a data table
shinyUI(fluidPage(

  # Application title
  titlePanel("Simple SparkDataFrame Example"),

  # Sidebar with a text input for the filename 
  sidebarLayout(
    sidebarPanel(
      textInput("path",
                "HDFS File Path:",
                "/big_data/example_SQL/684c9b31-892c-48c5-8574-9dd61a9d7e78.parquet"
      ),
      textInput("type",
                "HDFS File Type:",
                "parquet"
      )
    ),

    # Show the table
    mainPanel(
       dataTableOutput("myTable")
    )
  )
))
Enter fullscreen mode Exit fullscreen mode

Run the app in RStudio and you'll get...

Ta da! It just works. All we have to do is set up the sparkR.session in server.R and we have access to the same commands we ran earlier in the plain R shell. Note that using collect() to convert the SparkDataFrame to a data.frame means that the data has been collected from HDFS and is being held in memory for R to use! Sometimes, this may be what you want (but usually not, if you're working with gigantic datasets). It's best practice to use as many SparkDataFrame operations as you can before converting to an R data.frame.


So there you have it! A Hadoop/Spark Big Data back end with a nice R Shiny front end. Perfect for analyzing your data lake and sending nice, polished results to the end user, with all the power of Spark behind your analysis!


Note:

A useful package in R for finding which package holds a particular function you're calling (among other things) is sos, which provides findFn():

> findFn("read.df")
found 3 matches
Downloaded 3 links in 1 packages.
Ignoring template.
Enter fullscreen mode Exit fullscreen mode

...which will pop up a window like the following in your browser:

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .