Skip to content

Latest commit



245 lines (172 loc) · 7.32 KB

File metadata and controls

245 lines (172 loc) · 7.32 KB

Risk Analysis with Spark

GOAL - Calculate risk factors for drivers with Spark and utilize Zeppelin to visualize the results -- this demo will produce the same results as Risk Analysis with Pig

PREREQUISITE - Risk Analysis with Pig

SEE ALSO - This demo is based on these two publicly-available Hortonworks tutorials:


Leveraging Hive


Prepare Coding Environment

As directed from the Sandbox Splash Page, open Zeppelin by navigating your browser to Once there, Click on Notebook > Create new note and name it "Compute Riskfactor with Spark".

alt text

alt text

Get started with adding these Scala imports to your Zeppeline notebook and then clicking the play button (or press Shift+Enter) to run the code.

import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql._

alt text

Instantiate HiveContext by adding the next line of code in the next code block window and run this code.

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

alt text

NOTE: Continue with this pattern of putting subsequent code in following code block windows (aka paragraphs) to avoid having to rerun any prior coding snippets.

Load DataFrame Objects

View the list of table available from Hive.

hiveContext.sql("show tables").collect.foreach(println)

Load the contents of the geolocation Hive table into a DataFrame for use within Spark.

val geolocation_all_DF = hiveContext.sql("SELECT * FROM geolocation")

Display its results. 

Register this DataFrame so it can be used within SQL statements.


Transform/Process Data

Filter out the drivers who only have "normal" driving events and calculate the count of risky events for each remaining driver. Then register this new DataFrame to allow subsequent SQL operations.

val risky_driver_event_counts_DF = hiveContext.sql(
    "SELECT driverid, count(driverid) occurance " +
    "  FROM geolocation_all_temp " +
    " WHERE event != 'normal' " +
    " GROUP BY driverid ")

Review these results.

NOTE: You can do this from the DataFrame API as well instead as a query.

geolocation_all_DF.filter("event != 'normal'").
    groupBy("driverid").count().withColumnRenamed("count", "occurance").

NOTE: To conserve resources in the notebook, use the "Clear Output" functionality that is available by clicking on the gear icon in upper-right corner of this paragraph and then selecting "Clear Output" as shown in the following screenshot.

alt text

Now, join the temporary table that holds the risky drivers and their counts with Hive's driver_mileage table created in Risk Analysis with Pig, then store that DataFrame as a temporary table.

val joined_DF = hiveContext.sql(
    "SELECT rd.driverid, rd.occurance, dm.totmiles " + 
    "  FROM risky_driver_event_counts_temp rd, driver_mileage dm " +
    " WHERE rd.driverid = dm.driverid")

Review the results of the join.

Again, you can alternatively do this with the DataFrame API. Notice that we did have to get the driver_mileage table into a DataFrame.

val driver_mileage_DF = hiveContext.sql("SELECT * FROM driver_mileage")

    risky_driver_event_counts_DF("driverid") === driver_mileage_DF("driverid"), 

NOTE: As before, use the "Clear Output" functionality on this unwanted paragraph.

Calculate Risk

Create a new DataFrame than contains, by driver, the risk calculation of total miles driven divided by the number of risky events. Register this as a temporary table and dump the contents to the notebook.

val risk_factor_calc_DF = hiveContext.sql(
    "SELECT driverid, occurance, totmiles, totmiles/occurance riskfactor " +
    "  FROM joined_temp")

Create a new table similar to the risk_factor one created in Risk Analysis with Pig to saved the newly calculated risk values from Spark.

    "CREATE TABLE risk_factor_spark( " +
    "   driverid String, events bigint, totmiles bigint,riskfactor double) " +

Verify from the Hive View (logged in as admin is fine) this table has been created and is empty.

Since we created this table to be backed by the ORC file format, we need to persist risk_factor_calc_DF to disk in that format.


Verify from the HDFS Files View that this information was persisted.

alt text

Load the data into Hive.

    "LOAD DATA INPATH 'risk_factor_calc_ORC' " +
    "INTO TABLE risk_factor_spark")

Verify it made by querying from Hive View.

alt text

Visualize Data

In a fresh paragraph, add the following query and execute it.


SELECT * FROM risk_factor_spark

Explore the various charts that are available from the toolbar highlighted below.

alt text

Choose the Bar Chart icon, click on settings next to the toolbar, and then set driverid in the Keys field and riskfactor in the Values field to see the peaks and valleys of ranges amongst drivers.

alt text

NOTE: As before, click on "Clear output" to free up visual space and system resources. Additionally (especially on prior versions of the Sandbox), it may become necessary to use Ambari to stop and start services. Ideally, shutdown Zeppellin and then Spark and start them up in reverse order.

Run another query to visualize against after creating a new notebook.


SELECT r.driverid, r.riskfactor,, g.state 
  FROM risk_factor_spark r, geolocation g 
 WHERE r.driverid = g.driverid

This time pick the right-most icon in the toolbar select:

  • driverid for xAxis
  • riskfactor for yAxis
  • city for group
  • riskfactor for size

alt text

This is just a quick intro to the types of visualization opportunities that exist with Zeppelin.