Oracle Big Data SQL

Big Data SQL Hands on Lab - README.md


Oracle Big Data SQL

Welcome to the Big Data SQL Workshop! Big Data SQL is an enabling technology that allows your applications to use Oracle Database to easily gain insights across a variety of big data sources. The goal for the workshop is to give you an understanding of how Big Data SQL works. Specifically, the focus is on Big Data SQL functional capabilities: how to access data from different sources (object store, hdfs, hive and Kafka), running queries across these sources and applying security.

Background

The workshop is based on information from the NYC Citi Bikes Ridesharing program - you can see the data here. A rider picks up a bike from a station anywhere in the city - takes a trip - and then drops off his/her bike at another station. The ending station may or may not be the same. We combine this information with historical ridership, station details and weather data - and then ask questions like:
  • What are the overall ridership trends? (from Oracle Database)
  • How is this affected by weather? (from Oracle Object Store)
  • Find trends in bicycle problems (from Hive)
  • What is the current bicycle deployment (from Kafka)
  • How do we ensure that the right bicycle inventory is deployed to various stations? (from all of these sources)

Tools you will be using

We'll answer these questions using a variety of tools: SQL Developer, Apache Zeppelin and Hue. You may be new to Zeppelin. Zeppelin is a notebook that lets you run a variety of technologies (SQL, python, R, spark, etc.) - all within a single "Note". It makes it easy to jump between different technologies from within a single UI. You will be running shell scripts, sql scripts, running interactive SQL commands, creating charts. Zeppelin may not be the best for any one of the tasks (e.g. I would much rather be using SQL Developer for running/debugging SQL) - but it works well for this instructional workshop.

Workshop Contents

Here are the tasks that you will perform during the workshop. Each lab builds on the previous, combining data from all of the different sources:
  • Lab 100: Review Ridership - Access Weather data from Oracle Object Store and combine with ridership data in Oracle Database
  • Lab 200: Gather Station Details - Add station data from HDFS and query the JSON source using Oracle JSON dot notation
  • Lab 300: Understand Trips and Bike Usage - Access all bike trips from Hive. Use analytic SQL to understand bike movement and bike problems
  • Lab 400: How are bikes deployed right now? - Query a Kafka stream for the latest information. Combine the stream with dimensional data in Oracle Database.
  • Lab 500: Secure the data - Apply advanced Oracle security policies across all the sources

Navigate to Lab 100

  • You can see a list of Lab Guides by clicking on the Menu Icon in the upper left corner of the browser window.
You're now ready to continue with Lab 100.


Lab 100: Review Ridership

Introduction

In Lab 100, you will review CitiBike ridership trends over a several month time period using a Zeppelin note. This ridership information is available in Oracle Database 18c. You will then augment ridership information with weather data sourced from Oracle Object Store - allowing you to see how weather potentially impacts usage. This weather data is not loaded into Oracle Database; rather, Big Data SQL will allow dynamic access to this new object store source thru Oracle SQL.

Lab 100 Objectives

  • Use Zeppelin to report on data captured in Oracle Database 18c
  • Learn how to seamlessly access data in Oracle Object Store from Oracle Database using Big Data SQL-enabled external tables.

Steps

STEP 1: Log into Zeppelin and Open the Big Data SQL Workshop Note

STEP 2: Review Ridership Information

STEP 3: Create and Query WEATHER Table in SQL Developer

Summary

You just created an Oracle Big Data SQL table over data stored in Oracle Object Store and queried it as you would any other Oracle table. This allows you to easily blend data across data sources to gain new insights.
This completes the Lab!
You are ready to proceed to Lab 200

Lab 200: Gather Station Details

Introduction

In Lab 200, you will create an Oracle table that refers to a file stored in hdfs. This file contains station details data in JSON format. You will use Oracle SQL JSON "dot notation" to parse the fields and create a station details table.

Lab 200 Objectives

  • Connect to HUE and review the stations JSON data file stored in the HDFS filesystem
  • Create a Big Data SQL external table called bikes.station_ext referring to this file
  • Perform some simple ETL by creating a table called bikes.stations from the bikes.stations_ext using Oracle SQL JSON dot notation

Steps

STEP 1: Check the data files in hdfs

STEP 2: Create an oracle table called bikes.station_ext

STEP 3: Show the JSON parsing capabilities of the Oracle Database using the JSON dot notation

STEP 4: Create an Oracle table called bikes.station using JSON dot notation

Summary


Lab 300: Setup Cloud Environment

Introduction

In Lab 300 you will acces all recorded bike trips stored in Hive, create an Oracle table from it and use analytical SQL to understand bike movement and bike problems.

Lab 300 Objectives

  • Connect to HUE and show the trips hive table
  • Create an oracle table out of this hive table
  • Run analytical sql statements to understand bike movement and problems
  • Create a materialized view holding trips problems and see automatic query rewrite happening

Steps

STEP 1: Connect to HUE and show the trips Hive table

STEP 2: Create an oracle table trip_exts

STEP 3: Run analytical sql statments

STEP 4: Create a materialized view holding trips problems

Summary

You have accessed bike trips data store in Hive leveraging the ORACLE_HIVE driver. You have identified the problematic bikes using analytical sql and stored them into a materialized view.
This completes the Lab!
You are ready to proceed to Lab 400

Lab 400: How are bikes deployed right now ?

Introduction

In Lab 400 you will access latest station information, stored in a kafa streams.

Lab 400 Objectives

  • Open zeppelin and start the kafka stream
  • Create an Oracle table station_status from this kafa stream
  • Use the JSON_DATAGUIDE function to parse the JSON format and create a v_station_status view
  • Query this newly created view to show how many bikes are available per station
  • Display the results in zeppelin

Steps

STEP 1: Connect to zeppelin

STEP 2: Start the kafka stream

STEP 3: Create a table from the kafka stream

STEP 4: Use JSON_DATAGUIDE function to parse JSON format

STEP 5: Review the results in zeppelin

Summary

You have accessed station status stored in Kafka streams, used the JSON_DATAGUIDE feature to parse the JSON format and displayed station bikes availability in zeppelin.
This completes the Lab!
You are ready to proceed to Lab 500

Lab 500: Secure the data

Introduction

Lab 500 Objectives

In this lab you will apply Oracle security policies on the big data sql tables you have created.

Steps

  • Define and use a VPD (Virtual Private Database) policy
  • Define and use a Data Redaction policy

STEP 1: Apply a VPD policy to the bikes user

STEP 2: Apply a data redaction policy to birth_year column the trips table

Summary

In this lab you have applied Oracle security on tables created with big data sql like you would have for any other table.
This completes the Lab!



-- Reset
drop table bikes.stations_ext;
drop table bikes.trips;
drop table bikes.station_status;
drop view v_station_status;
drop materialized view mv_problem_trips;
drop table bikes.stations;
drop function bikes.bds_vpd_station;
drop table weather;

set define off;

--
-- Lab 100
--
-- Review ridership information by day.  This summary information is in Oracle Database
-- Use Zeppelin to get a better view

select * from ridership;

-- How is ridership impacted by changes in the weather?
-- Weather data is in Oracle Object Store in a public bucket
-- Object Store data:
-- https://swiftobjectstorage.uk-london-1.oraclecloud.com/v1/adwc4pm/weather/weather-newark-airport.html

-- Create a table over that data

CREATE TABLE weather
  ( WEATHER_STATION_ID      VARCHAR2(20),
    WEATHER_STATION_NAME    VARCHAR2(100),
    REPORT_DATE             VARCHAR2(20),
    AVG_WIND                NUMBER,
    PRECIPITATION           NUMBER,
    SNOWFALL                NUMBER,
    SNOW_DEPTH              NUMBER,
    TEMP_AVG                NUMBER,
    TEMP_MAX                NUMBER,
    TEMP_MIN                NUMBER,
    WDF2                    NUMBER,
    WDF5                    NUMBER,
    WESD                    NUMBER,
    WESF                    NUMBER,
    WSF2                    NUMBER,
    WSF5                    NUMBER,
    FOG                     NUMBER,
    HEAVY_FOG               NUMBER(1),
    THUNDER                 NUMBER(1),
    SLEET                   NUMBER(1),
    HAIL                    NUMBER(1),
    GLAZE                   NUMBER(1),
    HAZE                    NUMBER(1),
    DRIFTING_SNOW           NUMBER(1),
    HIGH_WINDS              NUMBER(1)
  )
  ORGANIZATION EXTERNAL
  (TYPE ORACLE_BIGDATA
   DEFAULT DIRECTORY DEFAULT_DIR
   ACCESS PARAMETERS
   (
    com.oracle.bigdata.fileformat = textfile 
    com.oracle.bigdata.csv.skip.header=1
    com.oracle.bigdata.csv.rowformat.fields.terminator = '|'
   )
   location ('https://swiftobjectstorage.us-phoenix-1.oraclecloud.com/v1/adwc4pm/weather/*.csv')
  )  REJECT LIMIT UNLIMITED;
  
select * from weather;

-- Look at the impact of weather.  When it's hotter, no one has a problem with a little rain!
with rides_by_weather as (
    select case 
          when w.temp_avg < 32 then '32 and below'
          when w.temp_avg between 32 and 50 then '32 to 50'
          when w.temp_avg between 51 and 75 then '51 to 75'
          else '75 and higher'
        end temp_range,            
        case
          when w.precipitation = 0 then 'clear skies'
          else 'rain or snow'
        end weather,
        r.num_trips num_trips, 
        r.passes_24hr_sold,
        r.passes_3day_sold 
      from ridership r , weather w
      where r.day = w.report_date
    )
    select temp_range,
           weather,
           round(avg(num_trips)) num_trips,
           round(avg(passes_24hr_sold)) passes_24hr,
           round(avg(passes_3day_sold)) passes_3day
    from rides_by_weather
    group by temp_range, weather
    order by temp_range, weather;


--
-- Lab 200 
-- Station Data in JSON from HDFS
--
-- Let's get information about Stations and how bikes are used across them.
-- We'll want to know which stations are
-- Look at data stored in HDFS.  Station data feed.
-- Station data over JSON source.  Details about each station.
-- Hue:  http://bigdatalite.localdomain:8888/hue/filebrowser/view=/user/bikes#/data/bike-stations

CREATE TABLE bikes.stations_ext (
    doc varchar2(4000)         
 ) 
   ORGANIZATION EXTERNAL 
    ( TYPE ORACLE_HDFS
      DEFAULT DIRECTORY DEFAULT_DIR
      LOCATION ('/data/bike-stations')
    )
REJECT LIMIT UNLIMITED;


-- Query station data. Use Oracle dot notation JSON syntax to parse the data
select s.doc,
       s.doc.station_id,
       s.doc.name,
       s.doc.short_name,
       s.doc.lon as longitude,
       s.doc.lat as latitude,
       s.doc.region_id,
       s.doc.capacity,
       s.doc.eightd_station_services.service_type as service_type,
       s.doc.eightd_station_services.bikes_availability as bike_availability,
       s.doc.rental_methods,
       s.doc.rental_methods[0],
       s.doc.rental_methods[1]
from stations_ext s
where rownum < 100
;

-- Want to load the data into Oracle?  Perform some simple ETL
CREATE TABLE bikes.stations AS
SELECT to_number(s.doc.station_id) as station_id,
       s.doc.name as station_name,
       to_number(s.doc.lon) as longitude,
       to_number(s.doc.lat) as latitude,
       s.doc.region_id,
       s.doc.eightd_station_services.service_type as service_type,
       s.doc.eightd_station_services.bikes_availability as bike_availability,       
       to_number(s.doc.capacity) as capacity,
       s.doc.rental_methods
FROM stations_ext s
WHERE s.doc.name not like '%Don''t%';

select * from stations;


--
-- Lab 300  
-- Understand trip usage from data in Hive
--


-- Create table over the trips data.  This data is from a partitioned hive table.
CREATE TABLE bikes.trips 
(
  trip_duration number,
  start_time date,
  start_hour number,
  stop_time  date,
  start_station_id number,
  start_station_name varchar2(100),
  start_station_latitude number,
  start_station_longitude number,
  end_station_id number,
  end_station_name varchar2(100),
  end_station_latitude number,
  end_station_longitude number,
  bike_id number,
  user_type varchar2(50),
  birth_year number,
  gender number,
  start_month varchar2(10)
)  
  ORGANIZATION EXTERNAL 
    ( TYPE ORACLE_HIVE
      DEFAULT DIRECTORY DEFAULT_DIR
      ACCESS PARAMETERS
      (     
        com.oracle.bigdata.tablename = bikes.trips
      )
    );


-- Query it.
SELECT * 
FROM  trips
WHERE rownum < 100;

-- What are the most popular starting stations?
SELECT 
       start_station_name, 
       APPROX_RANK (ORDER BY APPROX_COUNT(*) DESC ) AS ranking,
       APPROX_COUNT(*) as num_trips
FROM trips
GROUP BY start_station_name
HAVING 
  APPROX_RANK ( 
  ORDER BY APPROX_COUNT(*) 
  DESC ) <= 5
ORDER BY 2;


-- Look at bikes and how the are deployed.
-- How many bikes were moved?  Use SQL Analytic Functions
-- Their drop off station is not the same as their starting station

with bike_start_dest as (
  select bike_id,
         start_station_name,
         end_station_name,
         lag(end_station_name, 1) over (partition by bike_id order by start_time) as prev_end_station,
         to_char(start_time, 'MM/DD') as start_day,
         to_char(start_time, 'HH24:MI') as start_time,
         to_char(stop_time, 'HH24:MI') as stop_time
  from trips
  where start_month = '2019-06'
)
select prev_end_station as moved_from,
       start_station_name as moved_to,
       start_day,
       count(*) as num_bikes_moved
from bike_start_dest
where prev_end_station != start_station_name
group by prev_end_station, start_station_name, start_day
order by start_day;

-- How often was a bike used for just a couple of minutes?  Probably means that there was an issue with it
WITH short_trips as (
select bike_id,
       trip_duration
from trips
where trip_duration < 120)
select bike_id, count(*) 
from short_trips 
group by bike_id 
order by 2 desc;


-- Performance - lets cache these short bike trips into an MV
CREATE MATERIALIZED VIEW mv_problem_trips
ENABLE QUERY REWRITE AS (
select *
from trips
where trip_duration < 120
);

-- What are the top 10 problematic bikes - notice automatic query rewrite
select bike_id,
       count(*) num_short_trips
from trips
where trip_duration < 120
group by bike_id
order by 2 desc
FETCH FIRST 10 ROWS ONLY;

-- For problematic bikes, lets see how they've been used during different types of weather
-- notice the query will automatically rewrite to the MV
WITH weather_type as (
select 
    report_date,
    case 
      when w.temp_avg < 32 then '32 and below'
      when w.temp_avg between 32 and 50 then '32 to 50'
      when w.temp_avg between 51 and 75 then '51 to 75'
      else '75 and higher'
    end temp_range,            
    case
      when w.precipitation = 0 then 'clear skies'
      else 'rain or snow'
    end weather
from weather w
)
select bike_id,
       temp_range,
       weather,
       count(*),
       rank() over (  ORDER BY COUNT(*) DESC ) as ranking
from weather_type w, trips t
where to_char(t.start_time, 'fmMM/DD/YY') = report_date
and t.trip_duration < 120  -- bikes that are a problem
group by t.bike_id, w.temp_range, w.weather
order by 5;

--
-- Lab 400  
-- Current Station Status
--


--
-- Kafka - Go to Zeppelin create the kafka topic and stream
--

-- create table over the stream

CREATE TABLE bikes.station_status
(
  topic varchar2(50),
  partitionid number,
  value clob,
  offset integer,
  timestamp timestamp,
  timestamptype integer
)  
  ORGANIZATION EXTERNAL 
    ( TYPE ORACLE_HIVE
      DEFAULT DIRECTORY DEFAULT_DIR
      ACCESS PARAMETERS
      (     
        com.oracle.bigdata.tablename = bikes.station_status
      )
    );

-- Query the stream.  The message is quite large and somewhat complex
select * from station_status where rownum < 2; 

-- Use the JSON_DATAGUIDE to make sense of the JSON
SELECT JSON_DATAGUIDE(value, DBMS_JSON.format_flat, DBMS_JSON.pretty) dg_doc
FROM   station_status
WHERE rownum < 2;

SELECT JSON_DATAGUIDE(value, DBMS_JSON.format_hierarchical, DBMS_JSON.pretty) dg_doc
FROM   station_status
WHERE  rownum < 2;

-- Use the JSON_DATAGUIDE to automatically create a view over the JSON data
BEGIN  
  DBMS_JSON.create_view(
    viewname  => 'v_station_status',
    tablename => 'station_status',
    jcolname   => 'value',
    dataguide  => '{
  "type" : "object",
  "properties" :
  {
    "ttl" :
    {
      "type" : "number",
      "o:length" : 2,
      "o:preferred_column_name" : "ttl"
    },
    "data" :
    {
      "type" : "object",
      "o:length" : 32767,
      "o:preferred_column_name" : "data",
      "properties" :
      {
        "stations" :
        {
          "type" : "array",
          "o:length" : 32767,
          "o:preferred_column_name" : "stations",
          "items" :
          {
            "properties" :
            {
              "is_renting" :
              {
                "type" : "number",
                "o:length" : 1,
                "o:preferred_column_name" : "is_renting"
              },
              "station_id" :
              {
                "type" : "string",
                "o:length" : 4,
                "o:preferred_column_name" : "station_id"
              },
              "is_installed" :
              {
                "type" : "number",
                "o:length" : 1,
                "o:preferred_column_name" : "is_installed"
              },
              "is_returning" :
              {
                "type" : "number",
                "o:length" : 1,
                "o:preferred_column_name" : "is_returning"
              },
              "last_reported" :
              {
                "type" : "number",
                "o:length" : 16,
                "o:preferred_column_name" : "last_reported"
              },
              "num_bikes_disabled" :
              {
                "type" : "number",
                "o:length" : 1,
                "o:preferred_column_name" : "num_bikes_disabled"
              },
              "num_docks_disabled" :
              {
                "type" : "number",
                "o:length" : 2,
                "o:preferred_column_name" : "num_docks_disabled"
              },
              "num_bikes_available" :
              {
                "type" : "number",
                "o:length" : 2,
                "o:preferred_column_name" : "num_bikes_available"
              },
              "num_docks_available" :
              {
                "type" : "number",
                "o:length" : 2,
                "o:preferred_column_name" : "num_docks_available"
              },
              "num_ebikes_available" :
              {
                "type" : "number",
                "o:length" : 1,
                "o:preferred_column_name" : "num_ebikes_available"
              },
              "eightd_has_available_keys" :
              {
                "type" : "boolean",
                "o:length" : 8,
                "o:preferred_column_name" : "eightd_has_available_keys"
              },
              "eightd_active_station_services" :
              {
                "type" : "array",
                "o:length" : 64,
                "o:preferred_column_name" : "eightd_active_station_services",
                "items" :
                {
                  "properties" :
                  {
                    "id" :
                    {
                      "type" : "string",
                      "o:length" : 64,
                      "o:preferred_column_name" : "id"
                    }
                  }
                }
              }
            }
          }
        }
      }
    },
    "last_updated" :
    {
      "type" : "number",
      "o:length" : 16,
      "o:preferred_column_name" : "last_updated"
    }
  }
}');
END;
/

desc v_station_status;

-- Get the latest message from Kafka
select "station_id",
       "num_bikes_available",
       timestamp
from v_station_status
where timestamp in (
    select max(timestamp)
    from station_status
    where timestamp < current_timestamp - interval '10' second
    );

-- Join to station lookup table.  Take a better look at this in Zeppelin
WITH latest_status as (
    select "station_id" as station_id,
           "num_bikes_available" as num_bikes_available,
           timestamp
    from v_station_status
    where timestamp in (
        select max(timestamp)
        from station_status
        where timestamp < current_timestamp - interval '100' second
        )
    )
select l.station_id,
       s.station_name,
       num_bikes_available
from latest_status l, stations s
where l.station_id = s.station_id
and region_id = 70
;


--
-- Lab 500
-- Security

-- Row level security
-- Create a VPD Policy
-- Only look at the station information that you are in charge of

CREATE OR REPLACE FUNCTION BIKES.BDS_VPD_STATION (obj_schema varchar2, obj_name VARCHAR2) RETURN VARCHAR2 AS 
  p_emp varchar2(100);
  p_retval varchar2(200);
BEGIN
  -- Bikes user is only allowed to see station info for Grove Street (3186)
  p_emp := sys_context('USERENV','AUTHENTICATED_IDENTITY');
  
  p_retval := case when p_emp = 'BIKES' then 'start_station_id=3186 or end_station_id=3186'
              else '1=1'
              end;
  
  
  RETURN p_retval;
END BDS_VPD_STATION;
/

-- Add the VPD Policy
BEGIN
  dbms_rls.add_policy(object_schema => 'BIKES',
    object_name     => 'TRIPS',
    policy_name     => 'FILTER_TRIPS',
    function_schema => 'BIKES',
    policy_function => 'BDS_VPD_STATION',
    statement_types => 'select');
END;
/

select start_station_name, 
       end_station_name, 
       start_time,
       stop_time,
       bike_id
from trips 
where rownum < 100;

-- Redact age
select start_station_name, 
       end_station_name, 
       start_time,
       bike_id,
       birth_year       
from trips 
where rownum < 100;

BEGIN
  DBMS_REDACT.ADD_POLICY(
    object_schema => 'BIKES',
    object_name => 'TRIPS',
    column_name => 'BIRTH_YEAR',
    policy_name => 'redact_birth_year',
    function_type => DBMS_REDACT.FULL,
    expression => q'[SYS_CONTEXT('USERENV','AUTHENTICATED_IDENTITY') = 'BIKES']'
  );
  
END;
/


select start_station_name, 
       end_station_name, 
       start_time,
       bike_id,
       birth_year       
from trips 
where rownum < 100;

Comments

Popular posts from this blog

Easy Text-to-Speech with Python

Dependencies between DAGs in Apache Airflow

Better File Storage in Oracle Cloud