Oracle Big Data SQL
Welcome to the Big Data SQL Workshop! Big Data SQL is an enabling technology that allows your applications to use Oracle Database to easily gain insights across a variety of big data sources. The goal for the workshop is to give you an understanding of how Big Data SQL works. Specifically, the focus is on Big Data SQL functional capabilities: how to access data from different sources (object store, hdfs, hive and Kafka), running queries across these sources and applying security.
Background
The workshop is based on information from the NYC Citi Bikes Ridesharing program - you can see the data here. A rider picks up a bike from a station anywhere in the city - takes a trip - and then drops off his/her bike at another station. The ending station may or may not be the same. We combine this information with historical ridership, station details and weather data - and then ask questions like:
- What are the overall ridership trends? (from Oracle Database)
- How is this affected by weather? (from Oracle Object Store)
- Find trends in bicycle problems (from Hive)
- What is the current bicycle deployment (from Kafka)
- How do we ensure that the right bicycle inventory is deployed to various stations? (from all of these sources)
Tools you will be using
We'll answer these questions using a variety of tools: SQL Developer, Apache Zeppelin and Hue. You may be new to Zeppelin. Zeppelin is a notebook that lets you run a variety of technologies (SQL, python, R, spark, etc.) - all within a single "Note". It makes it easy to jump between different technologies from within a single UI. You will be running shell scripts, sql scripts, running interactive SQL commands, creating charts. Zeppelin may not be the best for any one of the tasks (e.g. I would much rather be using SQL Developer for running/debugging SQL) - but it works well for this instructional workshop.
Workshop Contents
Here are the tasks that you will perform during the workshop. Each lab builds on the previous, combining data from all of the different sources:
- Lab 100: Review Ridership - Access Weather data from Oracle Object Store and combine with ridership data in Oracle Database
- Lab 200: Gather Station Details - Add station data from HDFS and query the JSON source using Oracle JSON dot notation
- Lab 300: Understand Trips and Bike Usage - Access all bike trips from Hive. Use analytic SQL to understand bike movement and bike problems
- Lab 400: How are bikes deployed right now? - Query a Kafka stream for the latest information. Combine the stream with dimensional data in Oracle Database.
- Lab 500: Secure the data - Apply advanced Oracle security policies across all the sources
Navigate to Lab 100
- You can see a list of Lab Guides by clicking on the Menu Icon in the upper left corner of the browser window.
You're now ready to continue with Lab 100.
Lab 100: Review Ridership
Introduction
In Lab 100, you will review CitiBike ridership trends over a several month time period using a Zeppelin note. This ridership information is available in Oracle Database 18c. You will then augment ridership information with weather data sourced from Oracle Object Store - allowing you to see how weather potentially impacts usage. This weather data is not loaded into Oracle Database; rather, Big Data SQL will allow dynamic access to this new object store source thru Oracle SQL.
Lab 100 Objectives
- Use Zeppelin to report on data captured in Oracle Database 18c
- Learn how to seamlessly access data in Oracle Object Store from Oracle Database using Big Data SQL-enabled external tables.
Steps
STEP 1: Log into Zeppelin and Open the Big Data SQL Workshop Note
STEP 2: Review Ridership Information
STEP 3: Create and Query WEATHER Table in SQL Developer
Summary
You just created an Oracle Big Data SQL table over data stored in Oracle Object Store and queried it as you would any other Oracle table. This allows you to easily blend data across data sources to gain new insights.
This completes the Lab!
You are ready to proceed to Lab 200
Lab 200: Gather Station Details
Introduction
In Lab 200, you will create an Oracle table that refers to a file stored in hdfs. This file contains station details data in JSON format. You will use Oracle SQL JSON "dot notation" to parse the fields and create a station details table.
Lab 200 Objectives
- Connect to HUE and review the stations JSON data file stored in the HDFS filesystem
- Create a Big Data SQL external table called
bikes.station_ext
referring to this file - Perform some simple ETL by creating a table called
bikes.stations
from thebikes.stations_ext
using Oracle SQL JSON dot notation
Steps
STEP 1: Check the data files in hdfs
STEP 2: Create an oracle table called bikes.station_ext
STEP 3: Show the JSON parsing capabilities of the Oracle Database using the JSON dot notation
STEP 4: Create an Oracle table called bikes.station
using JSON dot notation
Summary
Lab 300: Setup Cloud Environment
Introduction
In Lab 300 you will acces all recorded bike trips stored in Hive, create an Oracle table from it and use analytical SQL to understand bike movement and bike problems.
Lab 300 Objectives
- Connect to HUE and show the
trips
hive table - Create an oracle table out of this hive table
- Run analytical sql statements to understand bike movement and problems
- Create a materialized view holding trips problems and see automatic query rewrite happening
Steps
STEP 1: Connect to HUE and show the trips Hive table
STEP 2: Create an oracle table trip_exts
STEP 3: Run analytical sql statments
STEP 4: Create a materialized view holding trips problems
Summary
You have accessed bike trips data store in Hive leveraging the
This completes the Lab!
ORACLE_HIVE
driver. You have identified the problematic bikes using analytical sql and stored them into a materialized view.This completes the Lab!
You are ready to proceed to Lab 400
Lab 400: How are bikes deployed right now ?
Introduction
In Lab 400 you will access latest station information, stored in a kafa streams.
Lab 400 Objectives
- Open zeppelin and start the kafka stream
- Create an Oracle table
station_status
from this kafa stream - Use the
JSON_DATAGUIDE
function to parse the JSON format and create av_station_status
view - Query this newly created view to show how many bikes are available per station
- Display the results in zeppelin
Steps
STEP 1: Connect to zeppelin
STEP 2: Start the kafka stream
STEP 3: Create a table from the kafka stream
STEP 4: Use JSON_DATAGUIDE function to parse JSON format
STEP 5: Review the results in zeppelin
Summary
You have accessed station status stored in Kafka streams, used the JSON_DATAGUIDE feature to parse the JSON format and displayed station bikes availability in zeppelin.
This completes the Lab!
This completes the Lab!
You are ready to proceed to Lab 500
Lab 500: Secure the data
Introduction
Lab 500 Objectives
In this lab you will apply Oracle security policies on the big data sql tables you have created.
Steps
- Define and use a VPD (Virtual Private Database) policy
- Define and use a Data Redaction policy
STEP 1: Apply a VPD policy to the bikes
user
STEP 2: Apply a data redaction policy to birth_year
column the trips
table
Summary
In this lab you have applied Oracle security on tables created with big data sql like you would have for any other table.
This completes the Lab!
-- Reset drop table bikes.stations_ext; drop table bikes.trips; drop table bikes.station_status; drop view v_station_status; drop materialized view mv_problem_trips; drop table bikes.stations; drop function bikes.bds_vpd_station; drop table weather; set define off; -- -- Lab 100 -- -- Review ridership information by day. This summary information is in Oracle Database -- Use Zeppelin to get a better view select * from ridership; -- How is ridership impacted by changes in the weather? -- Weather data is in Oracle Object Store in a public bucket -- Object Store data: -- https://swiftobjectstorage.uk-london-1.oraclecloud.com/v1/adwc4pm/weather/weather-newark-airport.html -- Create a table over that data CREATE TABLE weather ( WEATHER_STATION_ID VARCHAR2(20), WEATHER_STATION_NAME VARCHAR2(100), REPORT_DATE VARCHAR2(20), AVG_WIND NUMBER, PRECIPITATION NUMBER, SNOWFALL NUMBER, SNOW_DEPTH NUMBER, TEMP_AVG NUMBER, TEMP_MAX NUMBER, TEMP_MIN NUMBER, WDF2 NUMBER, WDF5 NUMBER, WESD NUMBER, WESF NUMBER, WSF2 NUMBER, WSF5 NUMBER, FOG NUMBER, HEAVY_FOG NUMBER(1), THUNDER NUMBER(1), SLEET NUMBER(1), HAIL NUMBER(1), GLAZE NUMBER(1), HAZE NUMBER(1), DRIFTING_SNOW NUMBER(1), HIGH_WINDS NUMBER(1) ) ORGANIZATION EXTERNAL (TYPE ORACLE_BIGDATA DEFAULT DIRECTORY DEFAULT_DIR ACCESS PARAMETERS ( com.oracle.bigdata.fileformat = textfile com.oracle.bigdata.csv.skip.header=1 com.oracle.bigdata.csv.rowformat.fields.terminator = '|' ) location ('https://swiftobjectstorage.us-phoenix-1.oraclecloud.com/v1/adwc4pm/weather/*.csv') ) REJECT LIMIT UNLIMITED; select * from weather; -- Look at the impact of weather. When it's hotter, no one has a problem with a little rain! with rides_by_weather as ( select case when w.temp_avg < 32 then '32 and below' when w.temp_avg between 32 and 50 then '32 to 50' when w.temp_avg between 51 and 75 then '51 to 75' else '75 and higher' end temp_range, case when w.precipitation = 0 then 'clear skies' else 'rain or snow' end weather, r.num_trips num_trips, r.passes_24hr_sold, r.passes_3day_sold from ridership r , weather w where r.day = w.report_date ) select temp_range, weather, round(avg(num_trips)) num_trips, round(avg(passes_24hr_sold)) passes_24hr, round(avg(passes_3day_sold)) passes_3day from rides_by_weather group by temp_range, weather order by temp_range, weather; -- -- Lab 200 -- Station Data in JSON from HDFS -- -- Let's get information about Stations and how bikes are used across them. -- We'll want to know which stations are -- Look at data stored in HDFS. Station data feed. -- Station data over JSON source. Details about each station. -- Hue: http://bigdatalite.localdomain:8888/hue/filebrowser/view=/user/bikes#/data/bike-stations CREATE TABLE bikes.stations_ext ( doc varchar2(4000) ) ORGANIZATION EXTERNAL ( TYPE ORACLE_HDFS DEFAULT DIRECTORY DEFAULT_DIR LOCATION ('/data/bike-stations') ) REJECT LIMIT UNLIMITED; -- Query station data. Use Oracle dot notation JSON syntax to parse the data select s.doc, s.doc.station_id, s.doc.name, s.doc.short_name, s.doc.lon as longitude, s.doc.lat as latitude, s.doc.region_id, s.doc.capacity, s.doc.eightd_station_services.service_type as service_type, s.doc.eightd_station_services.bikes_availability as bike_availability, s.doc.rental_methods, s.doc.rental_methods[0], s.doc.rental_methods[1] from stations_ext s where rownum < 100 ; -- Want to load the data into Oracle? Perform some simple ETL CREATE TABLE bikes.stations AS SELECT to_number(s.doc.station_id) as station_id, s.doc.name as station_name, to_number(s.doc.lon) as longitude, to_number(s.doc.lat) as latitude, s.doc.region_id, s.doc.eightd_station_services.service_type as service_type, s.doc.eightd_station_services.bikes_availability as bike_availability, to_number(s.doc.capacity) as capacity, s.doc.rental_methods FROM stations_ext s WHERE s.doc.name not like '%Don''t%'; select * from stations; -- -- Lab 300 -- Understand trip usage from data in Hive -- -- Create table over the trips data. This data is from a partitioned hive table. CREATE TABLE bikes.trips ( trip_duration number, start_time date, start_hour number, stop_time date, start_station_id number, start_station_name varchar2(100), start_station_latitude number, start_station_longitude number, end_station_id number, end_station_name varchar2(100), end_station_latitude number, end_station_longitude number, bike_id number, user_type varchar2(50), birth_year number, gender number, start_month varchar2(10) ) ORGANIZATION EXTERNAL ( TYPE ORACLE_HIVE DEFAULT DIRECTORY DEFAULT_DIR ACCESS PARAMETERS ( com.oracle.bigdata.tablename = bikes.trips ) ); -- Query it. SELECT * FROM trips WHERE rownum < 100; -- What are the most popular starting stations? SELECT start_station_name, APPROX_RANK (ORDER BY APPROX_COUNT(*) DESC ) AS ranking, APPROX_COUNT(*) as num_trips FROM trips GROUP BY start_station_name HAVING APPROX_RANK ( ORDER BY APPROX_COUNT(*) DESC ) <= 5 ORDER BY 2; -- Look at bikes and how the are deployed. -- How many bikes were moved? Use SQL Analytic Functions -- Their drop off station is not the same as their starting station with bike_start_dest as ( select bike_id, start_station_name, end_station_name, lag(end_station_name, 1) over (partition by bike_id order by start_time) as prev_end_station, to_char(start_time, 'MM/DD') as start_day, to_char(start_time, 'HH24:MI') as start_time, to_char(stop_time, 'HH24:MI') as stop_time from trips where start_month = '2019-06' ) select prev_end_station as moved_from, start_station_name as moved_to, start_day, count(*) as num_bikes_moved from bike_start_dest where prev_end_station != start_station_name group by prev_end_station, start_station_name, start_day order by start_day; -- How often was a bike used for just a couple of minutes? Probably means that there was an issue with it WITH short_trips as ( select bike_id, trip_duration from trips where trip_duration < 120) select bike_id, count(*) from short_trips group by bike_id order by 2 desc; -- Performance - lets cache these short bike trips into an MV CREATE MATERIALIZED VIEW mv_problem_trips ENABLE QUERY REWRITE AS ( select * from trips where trip_duration < 120 ); -- What are the top 10 problematic bikes - notice automatic query rewrite select bike_id, count(*) num_short_trips from trips where trip_duration < 120 group by bike_id order by 2 desc FETCH FIRST 10 ROWS ONLY; -- For problematic bikes, lets see how they've been used during different types of weather -- notice the query will automatically rewrite to the MV WITH weather_type as ( select report_date, case when w.temp_avg < 32 then '32 and below' when w.temp_avg between 32 and 50 then '32 to 50' when w.temp_avg between 51 and 75 then '51 to 75' else '75 and higher' end temp_range, case when w.precipitation = 0 then 'clear skies' else 'rain or snow' end weather from weather w ) select bike_id, temp_range, weather, count(*), rank() over ( ORDER BY COUNT(*) DESC ) as ranking from weather_type w, trips t where to_char(t.start_time, 'fmMM/DD/YY') = report_date and t.trip_duration < 120 -- bikes that are a problem group by t.bike_id, w.temp_range, w.weather order by 5; -- -- Lab 400 -- Current Station Status -- -- -- Kafka - Go to Zeppelin create the kafka topic and stream -- -- create table over the stream CREATE TABLE bikes.station_status ( topic varchar2(50), partitionid number, value clob, offset integer, timestamp timestamp, timestamptype integer ) ORGANIZATION EXTERNAL ( TYPE ORACLE_HIVE DEFAULT DIRECTORY DEFAULT_DIR ACCESS PARAMETERS ( com.oracle.bigdata.tablename = bikes.station_status ) ); -- Query the stream. The message is quite large and somewhat complex select * from station_status where rownum < 2; -- Use the JSON_DATAGUIDE to make sense of the JSON SELECT JSON_DATAGUIDE(value, DBMS_JSON.format_flat, DBMS_JSON.pretty) dg_doc FROM station_status WHERE rownum < 2; SELECT JSON_DATAGUIDE(value, DBMS_JSON.format_hierarchical, DBMS_JSON.pretty) dg_doc FROM station_status WHERE rownum < 2; -- Use the JSON_DATAGUIDE to automatically create a view over the JSON data BEGIN DBMS_JSON.create_view( viewname => 'v_station_status', tablename => 'station_status', jcolname => 'value', dataguide => '{ "type" : "object", "properties" : { "ttl" : { "type" : "number", "o:length" : 2, "o:preferred_column_name" : "ttl" }, "data" : { "type" : "object", "o:length" : 32767, "o:preferred_column_name" : "data", "properties" : { "stations" : { "type" : "array", "o:length" : 32767, "o:preferred_column_name" : "stations", "items" : { "properties" : { "is_renting" : { "type" : "number", "o:length" : 1, "o:preferred_column_name" : "is_renting" }, "station_id" : { "type" : "string", "o:length" : 4, "o:preferred_column_name" : "station_id" }, "is_installed" : { "type" : "number", "o:length" : 1, "o:preferred_column_name" : "is_installed" }, "is_returning" : { "type" : "number", "o:length" : 1, "o:preferred_column_name" : "is_returning" }, "last_reported" : { "type" : "number", "o:length" : 16, "o:preferred_column_name" : "last_reported" }, "num_bikes_disabled" : { "type" : "number", "o:length" : 1, "o:preferred_column_name" : "num_bikes_disabled" }, "num_docks_disabled" : { "type" : "number", "o:length" : 2, "o:preferred_column_name" : "num_docks_disabled" }, "num_bikes_available" : { "type" : "number", "o:length" : 2, "o:preferred_column_name" : "num_bikes_available" }, "num_docks_available" : { "type" : "number", "o:length" : 2, "o:preferred_column_name" : "num_docks_available" }, "num_ebikes_available" : { "type" : "number", "o:length" : 1, "o:preferred_column_name" : "num_ebikes_available" }, "eightd_has_available_keys" : { "type" : "boolean", "o:length" : 8, "o:preferred_column_name" : "eightd_has_available_keys" }, "eightd_active_station_services" : { "type" : "array", "o:length" : 64, "o:preferred_column_name" : "eightd_active_station_services", "items" : { "properties" : { "id" : { "type" : "string", "o:length" : 64, "o:preferred_column_name" : "id" } } } } } } } } }, "last_updated" : { "type" : "number", "o:length" : 16, "o:preferred_column_name" : "last_updated" } } }'); END; / desc v_station_status; -- Get the latest message from Kafka select "station_id", "num_bikes_available", timestamp from v_station_status where timestamp in ( select max(timestamp) from station_status where timestamp < current_timestamp - interval '10' second ); -- Join to station lookup table. Take a better look at this in Zeppelin WITH latest_status as ( select "station_id" as station_id, "num_bikes_available" as num_bikes_available, timestamp from v_station_status where timestamp in ( select max(timestamp) from station_status where timestamp < current_timestamp - interval '100' second ) ) select l.station_id, s.station_name, num_bikes_available from latest_status l, stations s where l.station_id = s.station_id and region_id = 70 ; -- -- Lab 500 -- Security -- Row level security -- Create a VPD Policy -- Only look at the station information that you are in charge of CREATE OR REPLACE FUNCTION BIKES.BDS_VPD_STATION (obj_schema varchar2, obj_name VARCHAR2) RETURN VARCHAR2 AS p_emp varchar2(100); p_retval varchar2(200); BEGIN -- Bikes user is only allowed to see station info for Grove Street (3186) p_emp := sys_context('USERENV','AUTHENTICATED_IDENTITY'); p_retval := case when p_emp = 'BIKES' then 'start_station_id=3186 or end_station_id=3186' else '1=1' end; RETURN p_retval; END BDS_VPD_STATION; / -- Add the VPD Policy BEGIN dbms_rls.add_policy(object_schema => 'BIKES', object_name => 'TRIPS', policy_name => 'FILTER_TRIPS', function_schema => 'BIKES', policy_function => 'BDS_VPD_STATION', statement_types => 'select'); END; / select start_station_name, end_station_name, start_time, stop_time, bike_id from trips where rownum < 100; -- Redact age select start_station_name, end_station_name, start_time, bike_id, birth_year from trips where rownum < 100; BEGIN DBMS_REDACT.ADD_POLICY( object_schema => 'BIKES', object_name => 'TRIPS', column_name => 'BIRTH_YEAR', policy_name => 'redact_birth_year', function_type => DBMS_REDACT.FULL, expression => q'[SYS_CONTEXT('USERENV','AUTHENTICATED_IDENTITY') = 'BIKES']' ); END; / select start_station_name, end_station_name, start_time, bike_id, birth_year from trips where rownum < 100;
No comments:
Post a Comment