pyspark-note
- pyspark-note
- Concept
- RDD Basic operation
- Deal with
JSON
data - Spark Dataframe
- Graph, edge, vertice, Graphframe
- spark-install-macos
- Reference
Concept
The records/items/elemets are stored in RDD(s).
Each RDD composists of Partitions
; each Partition
contains equal number of items
/elements
.
Where code runs
Source: https://spark.apache.org/docs/latest/cluster-overview.html
Most Python code runs in driver (in our local PC), except for code passed to RDD transformations.
- Transformations run at executors (in workers),
- actions run at executors and driver.
RDD Basic operation
RDD Programming Guide
==> https://spark.apache.org/docs/latest/rdd-programming-guide.html
Transformations
.map()
v.s. .mapPartitions()
v.s. .mapPartitionsWithIndex()
Spark的map,mapPartitions,mapPartitionsWithIndex詳解
==> https://blog.csdn.net/QQ1131221088/article/details/104051087
.map()
1. Return a new distributed dataset formed by passing each element of the source through a function func.
rdd_2 = sc.parallelize(range(10), 4)
new_rdd_2 = rdd_2.map(lambda x: str(x))
print('> .map() =\n', new_rdd_2.glom().collect())
print()
#################################################
from pyspark import TaskContext
result = rdd_2.map(lambda x :x+TaskContext().partitionId())
print('> Original RDD, rdd_2.glom().collect() =\n', rdd_2.glom().collect())
print()
print('> .map() with TaskContext().partitionId() =\n', result.glom().collect())
Output:
> .map() =
[['0', '1'], ['2', '3', '4'], ['5', '6'], ['7', '8', '9']]
> Original RDD, rdd_2.glom().collect() =
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
> .map() with TaskContext().partitionId() =
[[0, 1], [3, 4, 5], [7, 8], [10, 11, 12]]
.mapPartitions()
2. Similar to .map()
, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U>
when running on an RDD of type T.
==> Divide-and-Conquer
algorithm => master node divides
the RDD into partitions, and distributes the partitions to workers, workers apply the same function to its partition. Then master node gets back (i.e. conquer
) the processed resuts from all workers.
(1)
rdd_2 = sc.parallelize([1,2,3,4,5,'a','b','c','d','e'], 4)
def func(itemsIteratorInPartition):
# apply this `func` to each partition (=the whole partition) of the RDD
yield str(itemsIteratorInPartition)
rdd_func = rdd_2.mapPartitions(func)
print('rdd_2.mapPartitions(func) =\n', rdd_func.glom().collect())
print()
Output:
rdd_2.mapPartitions(func) =
[['<itertools.chain object at 0x7ff8094580d0>'], ['<itertools.chain object at 0x7ff8094580d0>'], ['<itertools.chain object at 0x7ff8094580d0>'], ['<itertools.chain object at 0x7ff8094580d0>']]
(2)
def func_2(itemsIteratorInPartition):
# you loop through each item in each partition of the RDD
# = just apply this `func_2` to each item in each partition
for item in itemsIteratorInPartition:
yield str(item)
rdd_func_2 = rdd_2.mapPartitions(func_2)
print('rdd_2.mapPartitions(func_2) =\n', rdd_func_2.glom().collect())
Output:
rdd_2.mapPartitions(func_2) =
[['1', '2'], ['3', '4'], ['5', 'a'], ['b', 'c', 'd', 'e']]
.mapPartitionsWithIndex()
3. Similar to mapPartitions
, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U>
when running on an RDD of type T.
By using mapParitionsWithIndex you could output new elements which have their partition in it, then when you reduce you will know which partition you are handling the elements from.
==> https://stackoverflow.com/questions/31281225/find-out-the-partition-no-id
rdd_3 = sc.parallelize(range(10), 4)
# mapPartitionsWithIndex
def func(partitionIndex, itemsIteratorInPartition):
# apply this `func` to each partition (=the whole partition) of the RDD
yield (partitionIndex, sum(itemsIteratorInPartition))
new_rdd_3 = rdd_3.mapPartitionsWithIndex(func)
# glom() flattens elements on the same partition
print('> rdd_3.glom().collect() =', rdd_3.glom().collect())
print('> new_rdd_3.glom().collect() =', new_rdd_3.glom().collect())
################################################################################
def func_2(partitionIndex, itemsIteratorInPartition):
# you loop through each item in each partition of the RDD
# = just apply this `func_2` to each item in each partition
for item in itemsIteratorInPartition:
yield str(item+partitionIndex)
new_2_rdd_3 = rdd_3.mapPartitionsWithIndex(func_2)
# glom() flattens elements on the same partition
print()
print('>> new_2_rdd_3.glom().collect() =', new_2_rdd_3.glom().collect())
Output:
> rdd_3.glom().collect() = [[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
> new_rdd_3.glom().collect() = [[(0, 1)], [(1, 9)], [(2, 11)], [(3, 24)]]
>> new_2_rdd_3.glom().collect() = [['0', '1'], ['3', '4', '5'], ['7', '8'], ['10', '11', '12']]
.map()
v.s. .flatmap()
print(sc.version, '\n')
py_list = [str(x) for x in range(5)]
rdd = sc.parallelize(py_list)
# map
new_rdd = rdd.map(lambda item: item+'xx')
print('.map() =\n', new_rdd.collect())
print()
# flatmap
# same as .map(), but flatten the results before returns
# i.e. remove all `list of list`/`nested list`
new_rdd = rdd.flatMap(lambda item: item+'xx')
print('.flatmap() =\n', new_rdd.collect())
Output:
3.1.2
.map() =
['0xx', '1xx', '2xx', '3xx', '4xx']
.flatmap() =
['0', 'x', 'x', '1', 'x', 'x', '2', 'x', 'x', '3', 'x', 'x', '4', 'x', 'x']
.foreach()
v.s. .map()
==> See .foreach()
/ .map()
v.s. .foreach()
Actions
Below example shows there are 100 items/elements in this RDD, and this RDD is partitioned into 4 partitions (or items are grouped in 4 partitions).
sc.parallelize()
Store python list [0,1,...,99]
as RDD in Spark. This dataset is not loaded in memory. It is merely a pointer to the Python py_list
.
# Stores python list `[0,1,...,99]` as RDD in Spark
## This dataset is not loaded in memory
## It is merely a pointer to the Python `py_list`
py_list = range(100)
rdd = sc.parallelize(py_list)
print(rdd)
Output:
PythonRDD[11] at RDD at PythonRDD.scala:53
.count()
Returns the number of items in this RDD
#.count()
# Shows the number of items in this RDD
print('rdd.count()=', rdd.count())
Output:
100
.collect()
Returns all the items in this RDD as python list
#.collect()
# Returns all the items in this RDD as python list
print('rdd.collect()=', rdd.collect())
Output:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
.glom().collect()
Returns the content of each partitions as nested list
/ list of list
# Returns the content of each partitions as `nested list` / `list of list`
rdd.glom().collect()
Output:
[
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24],
[25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49],
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74],
[75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
]
.getNumPartitions()
Returns number of partitions in this RDD
#.getNumPartitions()
# Gets number of partitions in this RDD
print('rdd.getNumPartitions()=', rdd.getNumPartitions())
Output:
4
.foreach()
Just executes inside function for each data element in the RDD, but return NOTHING.
.map()
v.s. .foreach()
Short answer
.map()
:- is for transforming one RDD into another, then return the transformed.
- Return a new RDD by applying a function to each element of this RDD.
.foreach()
: is for applying an operation/function on all elements of this RDD.Note: RDD = 1 collection of elements
==> is-there-a-difference-between-foreach-and-map
Long answer:
The important difference between them is that map
accumulates all of the results into a collection, whereas foreach
returns nothing. map
is usually used when you want to transform a collection of elements with a function, whereas foreach
simply executes an action for each element.
In short,
foreach
is for applying an operation on each element of a collection of elements, whereasmap
is for transforming one collection into another.foreach
works with a single collection of elements. This is the input collection.map
works with two collections of elements: the input collection and the output collection.
.reduce()
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd.reduce(lambda a, b: a + b) #Merge the rdd values
('a',7,'a',2,'b',2)
Spark recomputes transformations
Transformed RDD is thrown away from memory after execution. If afterward transformations/actions need it, PySpark recompiles it.
Image. RDD Without vs With .cache()
/ .persist()
P.S. Solution: .cache()
/.persist()
the transformed RDD
A = sc.parallelize(range(1, 1000))
t = 100
B = A.filter(lambda x: x*x < t)
print('B.collect()=', B.collect()) # B.collect()= [1, 2, 3, 4, 5, 6, 7, 8, 9]
## Here: B finishes execution and is thrown away from memory
t = 200
C = B.filter(lambda x: x*x < t) # C needs B again, so recomputes B, but t=200 not =100
# So,
# B = A.filter(lambda x: x*x < 200)
# C = B.filter(lambda x: x*x < 200)
print('C.collect()=', C.collect()) # C.collect()= [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
print('C.count()=', C.count()) # C.count()= 14
.cache()
/.persist()
A = sc.parallelize(range(1, 1000))
t = 100
B = A.filter(lambda x: x*x < t)
print('B.collect()=', B.collect()) # B.collect()= [1, 2, 3, 4, 5, 6, 7, 8, 9]
# save this RDD in memory/disk
B.cache()
# B.persist()
## Here: B is in memory
t = 200
C = B.filter(lambda x: x*x < t) # C needs B again, memory stores B, NO need to recompute B
# So,
# B = previous B
# C = B.filter(lambda x: x*x < 200)
print('C.collect()=', C.collect()) # C.collect()= [1, 2, 3, 4, 5, 6, 7, 8, 9]
print('C.count()=', C.count()) # C.count()= 9
RDD - Closure
https://mycupoftea00.medium.com/understanding-closure-in-spark-af6f280eebf9
https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-
Closure example
global
variable as counter
Incorrect way - Q: Why printed counter is 0?
Ans: Because
- each executor (i.e. worker node) just applies
increment_counter()
func on its own copy of counter. - Also,
.foreach()
returns nothing
counter = 0
rdd = sc.parallelize(range(10))
print('rdd.collect() =', rdd.collect())
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print('counter =', counter) # just print out `counter` from your driver program, not from Spark
Output:
rdd.collect() = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
counter = 0
rdd.sum()
Correct way (1) - Correct way to do the above operation:
- The
.sum()
action is executed in Spark executor(s) .sum()
returns the sum value
print('rdd.sum() =', rdd.sum())
Output:
rdd.sum() = 45
.accumulator()
Correct way (2) - Use .accumulator()
can also solve the issue.
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)
def g(x):
global accum
accum += x
a = rdd.foreach(g)
print(accum.value)
45
Note
Update in transformations may be applied more than once if tasks or job stages are re-executed.
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)
def g(x):
global accum
accum += x
return x * x
a = rdd.map(g)
print(type(accum))
print(accum.value) # 0, because no action presents, `accum` is not immediately computed (= laziness/lazy execution)
# print(a.reduce(lambda x, y: x+y))
a.cache()
tmp = a.count()
print(accum.value) # 45
print(rdd.reduce(lambda x, y: x+y)) # 45
tmp = a.count()
print(accum.value) # 45
print(rdd.reduce(lambda x, y: x+y)) # 45
Output:
<class 'pyspark.accumulators.Accumulator'>
0 #why it is 0? because of "lazy execution", if no actions present, "accum" is not compiled immediately
45
45
45
45
Accumulator
https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
Note from lecture note: Suggestion: Avoid using accumulators whenever possible. Use reduce() instead.
JSON
data
Deal with ** Most useful ==> https://zhuanlan.zhihu.com/p/267353998
JSON
string to
Read JSON
to spark Dataframe first
Read https://sparkbyexamples.com/pyspark/pyspark-maptype-dict-examples/
Steps,
- JSON from API
- get
list of dict
- pySpark dataframe with
map type
- access PySpark MapType Elements
Details
# The nested json / list of dictionary data_json = [ ('James', {'hair': 'black', 'eye': 'brown'}), ('Michael', {'hair': 'brown', 'eye': None}), ('Robert', {'hair': 'red', 'eye': 'black'}), ('Washington', {'hair': 'grey', 'eye': 'grey'}), ('Jefferson', {'hair': 'brown', 'eye': ''}) ] df = spark.createDataFrame(data=data_json) df.printSchema()
Output:
root |-- Name: string (nullable = true) |-- properties: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) +----------+-----------------------------+ |Name |properties | +----------+-----------------------------+ |James |[eye -> brown, hair -> black]| |Michael |[eye ->, hair -> brown] | |Robert |[eye -> black, hair -> red] | |Washington|[eye -> grey, hair -> grey] | |Jefferson |[eye -> , hair -> brown] | +----------+-----------------------------+
Access the elements in map datatype
Method (1):
df3 = df.rdd.map(lambda x: \ (x.name, x.properties["hair"], x.properties["eye"])) \ .toDF(["name", "hair", "eye"]) df3.printSchema() df3.show()
OR
Method (2):
df.withColumn("hair", df.properties.getItem("hair")) \ .withColumn("eye", df.properties.getItem("eye")) \ .drop("properties") \ .show() df.withColumn("hair", df.properties["hair"]) \ .withColumn("eye", df.properties["eye"]) \ .drop("properties") \ .show()
Output:
root |-- name: string (nullable = true) |-- hair: string (nullable = true) |-- eye: string (nullable = true) +----------+-----+-----+ | name| hair| eye| +----------+-----+-----+ | James|black|brown| | Michael|brown| null| | Robert| red|black| |Washington| grey| grey| | Jefferson|brown| | +----------+-----+-----+
Spark Dataframe
.csv
Create sparkdf by reading customer.csv
:
CUSTKEY,NAME,ADDRESS,NATIONKEY,PHONE,ACCTBAL,MKTSEGMENT,COMMENT
1,Customer#000000001,IVhzIApeRb ot,c,E,15,25-989-741-2988,711.56,BUILDING,to the even, regular platelets. regular, ironic epitaphs nag e,
2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,13,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,
3,Customer#000000003,MG9kdTD2WBHm,1,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,
4,Customer#000000004,XxVSJsLAGtn,4,14-128-190-5944,2866.83,MACHINERY, requests. final, regular ideas sleep final accou,
...
orders.csv
:
ORDERKEY,CUSTKEY,ORDERSTATUS,TOTALPRICE,ORDERDATE,ORDERPRIORITY,CLERK,SHIPPRIORITY,COMMENT
1,370,O,172799.49,1996-01-02,5-LOW,Clerk#000000951,0,nstructions sleep furiously among ,
2,781,O,38426.09,1996-12-01,1-URGENT,Clerk#000000880,0, foxes. pending accounts at the pending, silent asymptot,
3,1234,F,205654.30,1993-10-14,5-LOW,Clerk#000000955,0,sly final accounts boost. carefully regular ideas cajole carefully. depos,
4,1369,O,56000.91,1995-10-11,5-LOW,Clerk#000000124,0,sits. slyly regular warthogs cajole. regular, regular theodolites acro,
5,445,F,105367.67,1994-07-30,5-LOW,Clerk#000000925,0,quickly. bold deposits sleep slyly. packages use slyly,
...
dfCustomer = spark.read.csv('customer.csv', header=True, inferSchema=True)
dfOrders = spark.read.csv('orders.csv', header=True, inferSchema=True)
.printSchema()
in df
dfCustomer.printSchema()
print(dfCustomer.count())
dfOrders.printSchema()
print(dfOrders.count())
Output:
root
|-- CUSTKEY: integer (nullable = true)
|-- NAME: string (nullable = true)
|-- ADDRESS: string (nullable = true)
|-- NATIONKEY: string (nullable = true)
|-- PHONE: string (nullable = true)
|-- ACCTBAL: string (nullable = true)
|-- MKTSEGMENT: string (nullable = true)
|-- COMMENT: string (nullable = true)
1500
root
|-- ORDERKEY: integer (nullable = true)
|-- CUSTKEY: integer (nullable = true)
|-- ORDERSTATUS: string (nullable = true)
|-- TOTALPRICE: double (nullable = true)
|-- ORDERDATE: string (nullable = true)
|-- ORDERPRIORITY: string (nullable = true)
|-- CLERK: string (nullable = true)
|-- SHIPPRIORITY: integer (nullable = true)
|-- COMMENT: string (nullable = true)
15000
.groupBy().count()
Find count of orders
of each customer CUSTKEY
has:
dfOrders_groupby = dfOrders.groupBy('CUSTKEY').count()
dfOrders_groupby.toPandas()
Output:
CUSTKEY count
0 463 20
1 1342 20
2 496 18
3 148 15
4 1088 7
... ... ...
995 401 12
996 517 25
997 422 12
998 89 7
999 1138 23
1000 rows × 2 columns
df.createOrReplaceTempView("sql_table")
, allows to run SQL queries once register df
as temporary tables
dfOrders_groupby.createOrReplaceTempView("sql_table")
# Can run SQL query on it
df = spark.sql("SELECT customer.CUSTKEY, orders.count FROM customer left outer join orders on customer.CUSTKEY = orders.CUSTKEY")
df.toPandas()
Output:
CUSTKEY count
0 1 9.0
1 2 10.0
2 3 NaN
3 4 31.0
4 5 9.0
... ... ...
1495 1496 9.0
1496 1497 NaN
1497 1498 20.0
1498 1499 21.0
1499 1500 NaN
1500 rows × 2 columns
.join()
/spark.sql()
dataframes
.join()
# join 2 df by `CUSTKEY`
joined_df = dfCustomer.join(dfOrders, on='CUSTKEY', how='leftouter')
df2 = joined_df.select('CUSTKEY', 'ORDERKEY').sort(asc('CUSTKEY'),desc('ORDERKEY')) #ascending by 'CUSTKEY', descending by 'ORDERKET'
df2.toPandas() #view
how
: str, optional default inner
. Must be one of: inner
, cross
, outer
, full
, fullouter
, full_outer
, left
, leftouter
, left_outer
, right
, rightouter
, right_outer
, semi
, leftsemi
, left_semi
, anti
, leftanti
and left_anti
.
spark.sql()
+ df.createOrReplaceTempView("sql_table")
dfOrders.createOrReplaceTempView("orders")
dfCustomer.createOrReplaceTempView("customer")
# Can run SQL query on it
df2 = spark.sql("SELECT customer.CUSTKEY, orders.ORDERKEY FROM customer left outer join orders on customer.CUSTKEY = orders.CUSTKEY")
df2.toPandas()
Output:
CUSTKEY ORDERKEY
0 1 53283.0
1 1 52263.0
2 1 43879.0
3 1 36422.0
4 1 34019.0
... ... ...
15495 1499 7301.0
15496 1499 3523.0
15497 1499 1472.0
15498 1499 1252.0
15499 1500 NaN
15500 rows × 2 columns
df1.union(df2)
concat 2 dataframes
The dataframes may need to have identical columns, in which case you can use withColumn()
to create normal_1
and normal_2
df_concat = df_1.union(df_2)
df.withColumn
, user defined function
UDF Graph, edge, vertice, Graphframe
Credit to link
GraphFrame(v, e)
, Create GraphFrame
# Vertics DataFrame
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 37),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 38),
("g", "Gabby", 60)
], ["id", "name", "age"])
# Edges DataFrame
e = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"), # b and c follow each other
("c", "b", "follow"), #
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend"),
("g", "e", "follow")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)
g.vertices.show()
g.edges.show()
# Vertics DataFrame
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 37),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 38),
("g", "Gabby", 60)
], ["id", "name", "age"])
# Edges DataFrame
e = spark.createDataFrame([
("a", "b", "follow"),
("c", "a", "friend"),
("b", "c", "follow"),
("d", "a", "follow"),
("f", "c", "follow"),
("f", "d", "follow"),
("f", "b", "follow"),
("c", "d", "follow"),
("g", "a", "friend"),
("g", "d", "friend"),
("g", "c", "friend"),
("e", "a", "follow"),
("e", "d", "follow")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)
GraphFrame
Explore Credit to link
g.triplets.show() # display all
g.vertices.show() # display vertices
g.edges.show() # display edges
g.degrees.show()
g.inDegrees.show()
g.outDegrees.show()
g.filterVerices()
g.filterEdges()
Filter, Returns GraphFrame
, not DataFrame
.
Credit to link
g.filterVerices("columnName > 30")
g.filterEdges("columnName = 30")
g.dropIsolatedVertices() #Drop isolated vertices (users) which are not contained in any edges (relationships).
#Vertices without incoming / outgoing edges
.find("(a)-[e]->(b)")
, Motif finding
Find the edges e
from vertex a
to vertex b
.
P.S. .find()
returns sparkDF
DataFrame.
Credit to link
g.find("(a)-[]->(b);(b)-[]->(a)").filter("a.id < b.id") # A and B follow/friend each other;
# .filter() out "B follows/friends back A" rows,
# just keeps "A follows/friends B" rows
g.find("(a)-[]->(b); !(b)-[]->(a)").filter("a.id < b.id") # jsut A follows B, B not follows A
g.find("!()-[]->(a)") # find vertices without incoming edges
g.find("(a)-[e]->(b)").filter("e.relationship = 'follow'") # find A follows B,
Subgraphs
Credit to msbd5003
.
# Build subgraph based on conditions, i.e. subgraph contains (v,e)
# Select subgraph of users older than 30, and relationships of type "friend".
# Drop isolated vertices (users) which are not contained in any edges (relationships).
g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'")\
.dropIsolatedVertices()
g1.vertices.show()
g1.edges.show()
Output:
+---+------+---+
| id| name|age|
+---+------+---+
| e|Esther| 32|
| b| Bob| 36|
| a| Alice| 34|
+---+------+---+
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| e| friend|
| a| b| friend|
+---+---+------------+
spark-install-macos
Run the following in macOS terminal,
How to start Jupyter Notebook with Spark + GraphFrames
Start it locally
Modify the PATH variables,
$ nano ~/.bashrc
Add the following lines in
~/.bashrc
, sospark
andjupyter notebook
can be launched at the same time.# Setting PATH for Spark 3.1.2 export SPARK_HOME=/usr/local/Cellar/apache-spark/3.1.2/libexec export PATH="$SPARK_HOME/bin/:$PATH" export PYSPARK_DRIVER_PYTHON="jupyter" export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
In terminal,
Update $PATH variable,
$ source ~/.bashrc
Ensure
graphframes-0.8.1-spark3.0-s_2.12.jar
presents in the/Users/<USER_NAME>/.ivy2/jars
folder:Start the
pyspark
withgraphframes
in terminal,==> Needs 2 flags,
--packages
and--jars
==> Ensure the
graphframes
package name is same asgraphframes-0.8.1-spark3.0-s_2.12.jar
in folder.=> Deal with error:
java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
=> https://blog.csdn.net/qq_42166929/article/details/105983616
$ pyspark --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 --jars graphframes-0.8.1-spark3.0-s_2.12.jar
Terminal output:
Jupyter Notebook:
Start in Google Colab
[Still need to investigate]
https://github.com/cenzwong/tech/tree/master/Note/Spark#graphframe
Use MongoDB in Spark
Terminal:
pyspark --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 --jars graphframes-0.8.1-spark3.0-s_2.12.jar \
--conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.application_test?readPreference=primaryPreferred" \
--conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.application_test" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
Notebook:
from pymongo import MongoClient
def _saveDfToMongoDB(sparkDF, mongoCollection):
sparkDF.cache()
print(f"Storing {sparkDF.count()} {mongoCollection} to db")
start = datetime.now()
sparkDF.write.format("mongo") \
.mode("append") \
.option("database", "msbd5003") \
.option("collection", mongoCollection) \
.save()
end = datetime.now()
spent = (end - start).total_seconds()
print(f"Stored, time used: {spent} s")
df = spark.read.json("file path")
_saveDfToMongoDB(df, "mongodb collection name")
Test Spark in Jupyter Notebook
Inside Jupyter Notebook:
==> Reference: https://medium.com/@roshinijohri/spark-with-jupyter-notebook-on-macos-2-0-0-and-higher-c61b971b5007
Cell 1:
print(sc.version)
Output 1:
3.1.2
OR
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from graphframes import *
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc
Output:
SparkSession - hive
SparkContext
Spark UI
Version
v3.1.2
Master
local[*]
AppName
PySparkShell
Cell 2:
path_to_the_jar_file = 'graphframes-0.8.1-spark3.0-s_2.12.jar'
sc.addPyFile(path_to_the_jar_file)
Cell 3:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
v = sqlContext.createDataFrame([("a", ),("b", ),], ["id", ])
e = sqlContext.createDataFrame([("a", "b"),], ["src", "dst"])
from graphframes import *
g = GraphFrame(v, e)
g.inDegrees.show()
Output 3:
+---+--------+
| id|inDegree|
+---+--------+
| b| 1|
+---+--------+
Cell 4:
import pyspark
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName("spark test").getOrCreate()
columns = ['id', 'dogs', 'cats']
vals = [
(1, 2, 0),
(2, 0, 1)
]
# create DataFrame
df = spark.createDataFrame(vals, columns)
df.show()
Output 4:
+---+----+----+
| id|dogs|cats|
+---+----+----+
| 1| 2| 0|
| 2| 0| 1|
+---+----+----+
Reference
Official pyspark example: https://github.com/spark-examples/pyspark-examples
ntroduction to Spark
Background
Hadoop
First open-source project for distributed storage, resource management and computing. Consist of 3 components:
- HDFS -> Distributed Storage
- YARN -> Resource Management
- Map Reduce -> Computing in parallel process (mostly written in Java or can also written in Python)
- Hadoop 1.0: consists of HDFS and Map Reduce (taking care of resource management)
- Hadoop 2.0: inclusive of YARN
- Hadoop 3.0: Improved storage/HDFS layer
Spark start to emerge and dominate over MapReduce in market, for a few reasons:
- Spark is In-Memory(RAM) processing framework, whereas MapReduce process in HDD/SSD, thus Spark has better performance than MapReduce.
- Hadoop MapReduce performance is related to HDD/SSD space. Say working on 50 hadoop node clusters (50 servers connected in parallel), if the hard disks space of those servers getting small, the CPU utilization will be affected and computing performance will drop, unless more and more clusters are continuously connected in parallel (horizontal scaling up).
- Since Hadoop makes use of servers for both computing and storage, the servers cannot be shut down, otherwise there will be data loss. So the deployment on cloud is expensive, as the node clusters must be running always. A better choice is to separate computing and storage.
- Memory card becomes cheaper nowadays. Spark can run in standalong mode, then we can have separate storage, so we don't have to integrate it with Hadoop.
Spark support four langurages:
- Scala (Most widely used since Spark itself is written in Scala, better integration) -> spark-shell
- Java (2nd most popular lang for data engineering) -> No shell, no CLI
- Python (Mostly for data science application) -> pyspark
- R (less used) -> sparkR
Definition
Spark is an unified computing engine for parallel data processing on clusters. It supports batch processing, streaming processing, ML and SQL queries.
Low Level APIs of Spark
- RDD - Resilient Dsitributed Datasets (it's like a row of data/record)
- Broadcast (less used)
- Accumulator (less used)
High Level APIs of Spark - Structured APIs
- Dataframe - no fixed schema
- SQL - SQL statement for dataframe/dataset control (data define and manipulation)
- Dataset - has fixed schema
On top of High Level APIs, it supports:
- Structured Streaming.
- Advanced Analytics. (for ML use)
- Other Libraries.
Versions
- 2014 -> 1.0 (1.6) -> RDD
- 2016 -> 2.0 (2.1, 2.4) -> Dataframe, Dataset
- 2020 -> 3.0 -> Faster speed and more ML libraries
Driver vs Executor
- Spark Driver is the central coordinator and it communicates with all the Workers. It controls the flow of program.
- Each Worker node consists of one or more Executor(s) who are responsible for running the Task. Executors register themselves with Driver. The Driver has all the information about the Executors at all the time.
- A Spark Application consists of a Driver Program and a group of Executors on the cluster.
- The Driver is a process that executes the main program of your Spark application and creates the SparkContext that coordinates the execution of jobs. SparkSession is created within driver node.
- The executors are processes running on the worker nodes of the cluster which are responsible for executing the tasks the driver process has assigned to them.
- The cluster manager (such as Mesos or YARN) is responsible for the allocation of physical resources to Spark Applications SparkSession is created within driver node.
Start Spark in Command Line
- "spark-shell" is the command to start scala spark shell
- "pyspark" is the command to start python spark shell
- "sparkR" is the command to start R spark shell
- Java has no shell, no command line interface
Spark Session vs Spark Context
- Since earlier versions of Spark or Pyspark, SparkContext (JavaSparkContext for Java) is an entry point to Spark programming with RDD and to connect to Spark Cluster
- Since Spark 2.0 SparkSession has been introduced and became an entry point to start programming with DataFrame and Dataset.
No comments:
Post a Comment