<
ScalableML
>
上一篇

C/C++
下一篇

Text Processing

ScalableML - Pyspark

1. Lab1 and Background Notes

1.1 The Configuratiion problems and Fixes

Mac - Switch the Different JAVA version on Mac

1.Firstly, you should specify your JAVA installed version in your mac, you could input this command into your computer.

$ /usr/libexec/java_home -V
Matching Java Virtual Machines (2):
    13, x86_64:	"Java SE 13"	/Library/Java/JavaVirtualMachines/jdk-13.jdk/Contents/Home
    1.8.0_241, x86_64:	"Java SE 8"	/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home

2.Next, visit the orcle website to install the JDK software NOT JRE in your mac, the default installation position is /Library/JAVA/JavaVirtualMachines, you could obtain all the effective JDK components from there.

3.Next, we should update the ~/.bash_profile to add the command into the default environment.

$ vi ~/.bash_profile
# Add below lines into the end of this file
export JAVA_8_HOME="$(/usr/libexec/java_home -v 1.8)"
export JAVA_13_HOME="$(/usr/libexec/java_home -v 13)"

alias jdk8="export JAVA_HOME=$JAVA_8_HOME"
alias jdk13="export JAVA_HOME=$JAVA_13_HOME"
# indicate the default jdk version
export JAVA_HOME=$JAVA_8_HOME

4.Update the ~/.bash_profile and switch your JAVA version.

$ source ~/.bash_profile
$ jdk8 && java -version
java version "1.8.0_241"
Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)
$ jdk13 && java -version
java version "13" 2019-09-17
Java(TM) SE Runtime Environment (build 13+33)
Java HotSpot(TM) 64-Bit Server VM (build 13+33, mixed mode, sharing)

Install the Spark(2.3.2) and Configure the Jupyter Notebook on local Machine(MAC)

1.Go to this Link to download all the history Apache Spark software. Select one specific version you want.

2.Extract the downloaded file into the location /usr/local, if you don’t know how to access to this path directly you can open the finder and press Shift+Command+G, open the ~/.bash_profile to add the Spark documents into the system path.

$ vim ~/.bash_profile
# Change the command to accord with your situation
export SPARK_HOME=/usr/local/{spark-2.3.2-bin-hadoop2.6}
export PATH="$PATH:$SPARK_HOME/bin:$PATH"

3.Check the python version from your computer and You should add the the corresponding location information into the ~/.bash_profile

$ which python3
{Your Default Python Installation Location}
$ vim ~/.bash_profile
export PYSPARK_PYTHON="{Your Default Python Installation}"
# e.g. "/Library/Frameworks/Python.framework/Versions/3.5/bin/python3.5"
export PYSPARK_DRIVER_PYTHON="{Your Default iPython Installation}"
# e.g. "/Library/Frameworks/Python.framework/Versions/3.5/bin/ipython3"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
alias pysbook='$SPARK_PATH/bin/pyspark'

4.After that, you could open the Jupyter Notebook by pyspark command or pysbook. You can alter the kernel as you like.

5.Enjoy your Pyspark!


Disscussion on the Command qrshx on the HPC

1.The details of the qrshx:

$ qrshx -P {Parameters} 
#gpu: GPU-equipped nodes (public)
#rse: Research Software Engineering group's CPU-only nodes (restricted access)
#rse-training: For training courses run by the Research Software Engineering group (restricted access)
#insigneo-imsb: Insigneo Integrative Musculo-Skeletal Biomechanics (IMSB) subgroup's nodes (restricted access)
#security: Computer Science Security project (restricted access)
#cstest: CiCS testing only
				-pe openmp 4
#The numebr of cores
#The amount of cores should be below the 4
				-l rmem=16G
#obtain the real memory
				-M {Your email address}
				-j y
#normal and error outputs into a single file (the file above)
				-gpu=1
#the GPU numbers you want to use
#https://docs.hpc.shef.ac.uk/en/latest/sharc/GPUComputingShARC.html#gpucomputing-sharc
$./{my_programe} --num-cores=2

2.The command should accod with the JupyterHub Server interface information. If not, you will not be allowed to distribute the resource.


The Command spark-submit

The spark-submit script in Spark’s bin directory is used to launch applications on a cluster.

For Python, you can use the --py-files argument of spark-submit to add .py, .zip or .egg files to be distributed with your application.

For General use, we could rely on this command to submit the jobs to the could clusters by requesting the resources.

Essential Parameters:

1.2 Background Information

Recap:

Spark-Overview

The Spark application can be interpretered into two parts: Driver Program and Application Master with the high level consisting of the SparkContext , Creating the RDDS and Performing the remain processes to achieve the final result.These transformations of RDDs are then translated into DAG and submitted to Scheduler to be executed on set of worker nodes.

The Spark application can handle with the user program request and separate process to excute user application, it can also creates SparkContext to control the cluster manager and schedule job execution.

Spark-Core

RDDs(Resilient Distributed Dataset):

RDD could be regarded as the immutable parallel data structure with failure recovery possibilities. The Spark provides the APIs for materializations of data as well as for control over caching and partitioning of elements to optimize data placement. RDD stores information about its parents to optimize execution (via pipelining of operations) and recompute partition in case of failure. Each RDD is split into multiple partitions (similar pattern with smaller sets). which may be computed on different nodes of the cluster.

Fault-tolerant, is able to recomputer missing or damaged partitions due to node failures.

Data residing on multiple nodes in a cluster.

A collection of partitioned elements, e.g. tuples or other objects(that represent records of the data you work with)

RDD has five main properties in Pyspark:

#a list of partitions (e.g. splits in Hadoop)
#i.e. the data inside a RDD is partitioned (split into Partitions) and then distributed across nodes in a cluster(one partition per JVM that may not correspnd to a single node)
def getPartitions: Array[Partition]

#a list of dependencies on other RDDs 
# Immutaable or Read-only
# - it does bot change once created and can be transformed using transformations to Rew RDD
def getDependencies: Seq[Dependency[_]]

#a function for computing each split
def compute(split: Partition, context: TaskContext): Iterator[T]

#(optional) a list of preferred locations to compute each split on
def getPreferredLocations(split: Partition): Seq[String] = Nil

//(optional) a partitioner for key-value RDDs
partitioner: Option[Partitioner] = None  

Next, we will talk about the details and demenstration of functions in initialing pyspark RDD.

# Two main ways to create RDDs:
# 		+ load an external dataset(SQL/Hadoops)
# 		+ distributing a set of objects

from pyspark.sql import SparkSession
spark = SparkSession \
      .builder \
      .appName("Demo RDD Example") \
      .config("spark.some.config.option","some-value") \
      .master("local") \
      .getOrCreate()
      

sc = spark.sparkContext      
df = spark.sparkContext.parallelize([(1,2,3,'a b c')]).toDF(['col1','col2','col3','col4'])

# output
+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
+----+----+----+-----+

The SparkSession is the entry point and can be created DataFrame as the input data frame.

The .builder allows users to construct a new SparkSession instance.

The .config could allow system to select a config about the application users created which can use this method are automatically propagated to both SparkConf and SparkSession’s own configuration. In other words, it is the the configuration of the Spark application, used to set various Spark parameters as key-value pairs.

from pyspark import SparkConf
new_instance = SparkConf()
spark = SparkSession.builder.config(new_instance)

The .master sets the Spark master URL to connect to, such as “local” to run locally, “local[4]” to run locally with 4 cores, or 'spark://master:7077' to run on a Spark standalone cluster.

For more details, you can click here to learn more actions and transformations about the RDD.

1.3 Some Actions and Transformations in PySpark

pyspark.sql.functions.split(str, pattern) [source]

df = spark.createDataFrame([('slppp6.intermind.net - - [01/Aug/1995:00:00:11 -0400] "GET /history/skylab/skylab-small.gif HTTP/1.0" 200 9202',)],['value'])
split_logFile = split(df['value'], '- -')


pyspark.sql.DataFrame.select()


pyspark.sql.DataFrame.withColumn(colName,col)

df = spark.createDataFrame([('slppp6.intermind.net - - [01/Aug/1995:00:00:11 -0400] "GET /history/skylab/skylab-small.gif HTTP/1.0" 200 9202',)],['value'])
split_logFile = df.split(df.value,r'- -')
new_logFile = new_object.withColumn('URL', split_logFile.getItem(0))


pyspark.sql.DataFrame.dropDuplicates()

new_logFile.select('URL').dropDuplicates().count()


pyspark.sql.DataFrame.orderBy(*cols, **kwargs)

result = new_logFile.orderBy("count", ascending = False).take(1)
# the type of result is Row
result[0][0]


pyspark.sql.DataFrame.limit(num)


pyspark.sql.DataFrame.collect()


pyspark.sql.Row

>>>>result? 
Type:        list
String form: [Row(URL='edams.ksc.nasa.gov ', count=6530)]
Length:      1
Docstring:  
list() -> new empty list
list(iterable) -> new list initialized from iterable's items


pyspark.sql.DataFrame.groupBy()

v = sc.parallelize([(1,'a','bn'),(2,'b','mn'),(2,'a','nm')]).toDF(['num','rule','str'])
v.groupby('rule').count().collect()
# output
[Row(rule='b', count=1), Row(rule='a', count=2)]


pyspark.SparkContext.parallelize(c, numSlices=None)


pyspark.sql.DataFrmae.filter()


pyspark.RDD.map()

>>>>demo = spark.sparkContext.parallelize([1,2,3,4,5,6])
>>>>demo.map(lambda x: x+1).collect()
[2, 3, 4, 5, 6, 7]


pyspark.sql.DatFrame.regexp_extract(str,pattern,idx)

>>>>df = spark.createDataFrame([('slppp6.intermind.net - - [01/Aug/1995:00:00:11 -0400] "GET /history/skylab/skylab-small.gif HTTP/1.0" 200 9202',)],['value'])
>>>>df.select(regexp_extract('value',r'(\d+)(\c)(\d+|-)',1)).collect()
# You can change this line with below line
# df.select(regexp_extract(df['value'],r'(\d+)\c(\d+|-)',1)).collect()


pyspark.RDD.reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)

# The Example about the partition
list_v = [("a",1),("b",1),("a",1)]
rdd = sc.parallelize(list_v)
#All the pair RDD operations take an optional second parameter for number of tasks
rdd.reduceByKey(_ + _, 5).collect()
# output
# [('a',2)('b',1)]


pyspark.RDD.distinct()

df = spark.sparkContext.parallelize([1,2,1])
df.distinct().collect()
# output
[1,2]


pyspark.RDD.persist()

val lines = sc.textFile("hdfs://…")
val errors = lines.filter(_.startsWith(Error))
errors.persist()
errors.count()
//If you do errors.count() again, the file will be loaded again and computed again
//Persist will tell Spark to cache the data in memory, to reduce the data loading cost of further actions on the same data
//erros.persist() will do nothing. It is a lazy operation. But now the RDD says "read this file and then cache the contents" The action will trigger compution and data caching.                      


pyspark.RDD.foreach(func)


pyspark.Accumalator(aid,value,accm_para)

accum = sc.accumulator(0, "Example Accumulator")
sc.parallelize(range(0,10)).foreach(lambda x: accum.add(x))
accum.value
# output 
55


2. Lab2 and Background Informatiom

2.1 MLlib - Spark’s machine learning (ML) library.

MLlib allows easy combination of numerous algorithms into a single pipeline using standardized APIs for machine learning algorithms. The key concepts are:

A list of some of the available ML features is available here.

Clarification on whether Estimator is a transformer. See Estimators

In this part, I want to analyse the function and datastructure I met about the MLlib.


pyspark.ml.linalg.Vector.dense(*elements)

rdd = sc.parallelize([1,2])
Vector.dense(rdd)
# output
[1.0,2.0]


pyspark.RDD.randomSplit


pyspark.ml.regression import LinearRegression

>>>>df = spark.createDataFrame([
    (1.0, 2.0, Vectors.dense(1.0)),
    (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"])
>>>>lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight")
>>>>model = lr.fit(df)
>>>>model.transorm(test_set)


pyspark.ml.evaluation.RegressionEvaluator()


pyspark.ml.feature.Tokenizer (*inputCol=None*, *outputCol=None*)

>>>>df = spark.createDataFrame([("a b c",)], ["text"])
>>>>tokenizer = Tokenizer(inputCol="text", outputCol="words")
>>>>tokenizer.transform(df).head()
Row(text='a b c', words=['a', 'b', 'c'])


pyspark.ml.feature.HashingTF(numFeatures=262144, binary=False, inputCol=None*, outputCol=None)


2.2 HDFS(Hadoop Distributed File System)

HDFS is a distributed file system designed to store large files spread across multiple physical machines and hard drives. Spark is a tool for running distributed computations over large datasets. Spark is a successor to the popular Hadoop MapReduce computation framework. Together, Spark and HDFS offer powerful capabilites for writing simple code that can quickly compute over large amounts of data in parallel.

HDFS is comprised of interconnected clusters of nodes where files and directories reside. An HDFS cluster consists of a single node, known as a NameNode, that manages the file system namespace and regulates client access to files. In addition, data nodes (DataNodes) store data as blocks within files.

Within HDFS, a given name node manages file system namespace operations like opening, closing, and renaming files and directories. A name node also maps data blocks to data nodes, which handle read and write requests from HDFS clients. Data nodes also create, delete, and replicate data blocks according to instructions from the governing name node.

HDFS

2.3 Pipelines

ML Pipelines: high-level APIs to create and tune machine learning pipelines.

ml-PipelineModel

pipeline_model


2.4 Word Count in Spark

word_count



3. Collaborative filtering in Machine Learning Usage

3.1 Description

General Description:

If you want to design a intelligent recommanded system to analyse the user’s behaviour, you can reply on this technology. This system can learn from the collected data and give better recommendations.

These techniques aim to fill in the missing entries of a user-item association matrix primarily based on the matrix itself.


Collaborative filtering in Pyspark - ALS

spark.ml currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries, using the alternating least squares (ALS) function.


3.2 Principle

3.2.1 Singular value decomposition(SVD)

Eigenvalues and Eigenvectors:

\[\begin{equation} \mathbf{Ax = \lambda x} \end{equation}\]

The A is the $n×n$ real symmetric matrix, $x$ is the $n$ dimension vector, we would define that the value $\lambda$ is the eigenvalues of the matrix $A$, and $x$ is the corresponding eigenvectors.

WHAT IS THE BENEFITS of Eigenvalues and Eigenvectors?

If we decompose the features of matrix $A$, we will onbtain the $n$ number of eigenvalues ${\lambda_1 \leq \lambda_2 \leq \lambda_3 \leq \dots \leq \lambda_n}$ and its eigenvalues are ${w_1,w_2,\dots,w_n}$. If these eigenvectors are irrelevant, the matrix $A$ can be represented by this feature decomposition:

\[A = W\sum{W^{-1}}\]

The $W$ is the $n×n$ matrix composed by eigenvectors, and the result of $\sum{W^{-1}}$ is $n×n$ main diagonal matrix composed by the $N$ eigenvalues.


Eigenvector normalization

Normalization of the $n$ eigenvector can make tbe $W$’s eigenvecots as the orthonormal basis. And It will satisfy $\left \left w_i\right \right _2 = 1 \quad w_i^Tw_i=1$.

In this case, to perform feature decomposition, the matrix $A$ must be the a square matrix. If $A$ is not a square matrix, that means what if the rows and columns are not the same, can we still decompose the matrix?

Now here comes our SVD.


SVD:

Assume there is one matrix which is $m×n$ size, the SVD of the matrix can be represented by:

\[A = U\sum{V^T}\]

The $U$ is the $m×m$ matrix, and $\sum{}$ is the $m×n$ matrix which only have main diagonal’s values. The every element on the main diagonal is called a singular value. V is the $n×n$ matrix. U and V are unitary matrices.

Solve the SVD:

We use the multiplication of transpose of $A$ and matrix $A$ , and we will obtain a new $n×$n matrix $A^TA$. If the $A^TA$ is the square matrix, we will continue to do the eigenvalue decomposation. And the result of the eigenvalues and eigenvectors can satisfy this equation:

\[(A^TA)v_i = \lambda_iv_i\]

Now we can obtain the $n$ eigenvalues and $n$ eigenvectors of the matrix $A^TA$. We can use these eigenvectors to compose a $n×n$ matrix $V$. In general, we define the feature vectors of matrix $V$ as the right singular vectors.

Next, we rely on another distinct way to generate another square matrix, which is $m×m $ $AA^T$. We do the eigenvalues decomposation to obtain the another sets of the eigenvalues and eigenvectors. The eigenvectors can compose a $m×m$ matrix $U$, we can these eigenvectors as the left singular vectors.

\[(AA^T)u_i = \lambda_iu_i\] \[\sum{u_i} = U\]

The only matrix we havn’t obtained is the $\sum$, and the diagonal values on the matrix are all singular values, so we just calculate the every singular values.

\[A = U\sum{V^T} ⇒ AV = U\sum{V^T}V ⇒ AV=U\sum ⇒ Au_i = \sigma_iv_i ⇒ \sigma_i=Au_i/v_i\]

From the previous steps, we can conclude that

\[\sigma_i = \sqrt{\lambda_i} ⇒ \sigma_i = \sqrt{A^TA}\]
3.2.2 K-Folds Vaildation

4.K-means Clustering

4.1 Discussion

Definition
Procedure:

1.Cluster the data set into groups of different density.

2.Choose points in small cluster as candidate outliers.

3.Computer the distance between candidate points and non-candidate clusters.

4.If candidate points are far from all other non-candidate points, they are outlines.


Two Approach to Generate the K-Means Cluster

Partitional Clustering:

Hierarchical Clustering:

4.2 Types of Clusters

Centre-Based (Prototype-based)
Density-Based

4.3 Lloyd Algorithm

4.3 K-means++

4.4 K-means||

4.5 K-means in Pyspark

In pyspark, the developers could rely on the

Top
Foot