Chào mừng các bạn quay trở lại với loạt bài Tự Học Tensorflow của tek4.vn. Bài viết này sẽ giới thiệu đến bạn Apache Spark là gì, nó hoạt động ra sao, khởi chạy nó như thế nào…Bắt đầu thôi
Xem thêm bài viết trước: Recurrent Neural Network – Ví dụ với TensorFlow
Apache Spark là gì?
Spark là một giải pháp dữ liệu lớn đã được chứng minh là dễ dàng hơn và nhanh hơn Hadoop MapReduce. Spark là một phần mềm mã nguồn mở được phát triển bởi phòng thí nghiệm UC Berkeley RAD vào năm 2009. Kể từ khi được ra mắt công chúng vào năm 2010, Spark đã trở nên phổ biến và được sử dụng trong ngành công nghiệp với quy mô chưa từng có.
Trong thời đại dữ liệu lớn, các nhà thực hành cần các công cụ nhanh và đáng tin cậy hơn bao giờ hết để xử lý luồng dữ liệu. Các công cụ trước đó như MapReduce được yêu thích nhưng rất chậm. Để khắc phục vấn đề này, Spark đưa ra một giải pháp vừa nhanh chóng vừa có mục đích chung. Sự khác biệt chính giữa Spark và MapReduce là nó chạy các tính toán trong bộ nhớ trong thời gian sau đó trên đĩa cứng. Nó cho phép truy cập và xử lý dữ liệu tốc độ cao, giảm thời gian từ hàng giờ xuống còn phút.
Pyspark là gì?
Spark là tên của công cụ để thực hiện tính toán cụm trong khi PySpark là thư viện của Python để sử dụng Spark.
Spark hoạt động như thế nào?
Spark dựa trên công cụ tính toán, có nghĩa là nó đảm nhiệm ứng dụng lập lịch, phân phối và giám sát. Mỗi tác vụ được thực hiện trên nhiều máy worker khác nhau được gọi là cụm máy tính. Một cụm máy tính đề cập đến việc phân chia các nhiệm vụ. Một máy thực hiện một nhiệm vụ, trong khi những máy khác đóng góp vào kết quả cuối cùng thông qua một nhiệm vụ khác. Cuối cùng, tất cả các nhiệm vụ được tổng hợp lại để tạo ra một đầu ra.
Spark được thiết kế để làm việc với:
- Python
- Java
- Scala
- SQL
Một tính năng quan trọng của Spark là có số lượng lớn thư viện tích hợp, bao gồm MLlib cho máy học. Nó cũng được thiết kế để hoạt động với các cụm Hadoop và có thể đọc nhiều loại tệp, bao gồm dữ liệu Hive, CSV, JSON, dữ liệu Casandra, v.v.
Cách cài đặt PySpark với AWS
Nhóm Jupyter xây dựng Docker image để chạy Spark một cách hiệu quả. Dưới đây là các bước bạn có thể làm theo để cài đặt phiên bản PySpark trong AWS.
Bước 1: Tạo một phiên bản
Trước hết ta cần tạo một instance. Truy cập tài khoản AWS của bạn và khởi chạy phiên bản. Bạn có thể tăng dung lượng lưu trữ lên đến 15g và sử dụng cùng một nhóm bảo mật như trong hướng dẫn của TensorFlow.
Bước 2: Mở kết nối
Mở kết nối và cài đặt bộ chứa docker. Lưu ý rằng, bạn cần ở đúng thư mục làm việc.
Chỉ cần chạy các mã này để cài đặt Docker:
1 2 3 4 5 |
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit |
Bước 3: Mở lại kết nối và cài đặt Spark
Sau khi mở lại kết nối, bạn có thể cài đặt image containing PySpark.
1 2 3 4 5 6 7 8 |
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree |
Bước 4: Mở Jupyter
Kiểm tra vùng chứa và tên của nó
1 |
docker ps |
Khởi chạy docker với nhật ký của docker theo sau là tên của docker. Ví dụ: docker logs zealous_goldwasser.
Truy cập trình duyệt của bạn và khởi chạy Jupyter. Địa chỉ là http://localhost:8888/. Dán mật khẩu do cmd cung cấp.
Lưu ý: nếu bạn muốn upload/download máy AWS của mình, bạn có thể sử dụng phần mềm Cyberduck
Cách cài đặt PySpark trên Windows / Mac với Conda
Sau đây là quy trình chi tiết về cách cài đặt PySpark trên Windows / Mac bằng Anaconda:
Để cài đặt Spark trên máy cục bộ của bạn, một phương pháp được khuyến nghị là tạo một conda environment mới.
Môi trường mới này sẽ cài đặt Python 3.6, Spark và tất cả các phụ thuộc.
Người dùng Mac
1 2 3 |
cd anaconda3 touch hello-spark.yml vi hello-spark.yml |
Người dùng Windows
1 2 3 |
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml |
Bạn có thể chỉnh sửa tệp .yml. Hãy thận trọng với phần thụt lề. Cần có hai dấu cách trước –
1 2 3 4 5 6 7 8 9 10 11 12 |
name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz |
Lưu nó và tạo môi trường. Tốn một chút thời gian
1 |
conda env create -f hello-spark.yml |
Bạn có thể kiểm tra tất cả môi trường được cài đặt trong máy của mình
1 |
conda env list |
1 |
Activate hello-spark |
Người dùng Mac
1 |
source activate hello-spark |
Người dùng Windows
1 |
activate hello-spark |
Lưu ý: Bạn đã tạo một môi trường TensorFlow cụ thể để chạy các hướng dẫn trên TensorFlow. Sẽ thuận tiện hơn khi tạo một môi trường mới khác với hello-tf.
Hãy tưởng tượng hầu hết dự án của bạn liên quan đến TensorFlow, nhưng bạn cần sử dụng Spark cho một dự án cụ thể. Bạn có thể đặt môi trường TensorFlow cho tất cả dự án của mình và tạo môi trường riêng cho Spark. Bạn có thể thêm bao nhiêu thư viện trong môi trường Spark tùy thích mà không cần can thiệp vào môi trường TensorFlow. Sau khi hoàn thành dự án của Spark, bạn có thể xóa nó mà không ảnh hưởng đến môi trường TensorFlow.
Jupyter
Mở Jupyter Notebook và thử xem PySpark có hoạt động không. Trong sổ tay mới, hãy dán mã mẫu PySpark sau:
1 2 3 |
import pyspark from pyspark import SparkContext sc =SparkContext() |
Nếu lỗi hiển thị, có thể là Java chưa được cài đặt trên máy của bạn. Trong mac, mở terminal và viết java -version, nếu có phiên bản java, hãy đảm bảo rằng nó là 1.8. Trong Windows, đi tới Application và kiểm tra xem có thư mục Java không. Nếu có một thư mục Java, hãy kiểm tra xem Java 1.8 đã được cài đặt chưa.
Nếu bạn cần cài đặt Java, bạn hãy truy cập link và tải xuống jdk-8u181-windows-x64.exe.
Đối với Người dùng Mac, nên sử dụng `brew.
1 2 |
brew tap caskroom/versions brew cask install java8 |
Spark Context
SparkContext là công cụ bên trong cho phép kết nối với các clusters. Nếu bạn muốn chạy một hoạt động, bạn cần có SparkContext.
Tạo một SparkContext
Trước hết, bạn cần khởi tạo SparkContext.
1 2 3 |
import pyspark from pyspark import SparkContext sc =SparkContext() |
Bây giờ SparkContext đã sẵn sàng, bạn có thể tạo một bộ sưu tập dữ liệu được gọi là RDD, Tập dữ liệu phân tán phục hồi (Resilient Distributed Dataset). Tính toán trong RDD được tự động song song trên toàn cluster.
1 |
nums= sc.parallelize([1,2,3,4]) |
Bạn có thể truy cập hàng đầu tiên
1 |
nums.take(1) |
1 |
[1] |
Bạn có thể áp dụng một phép chuyển đổi cho dữ liệu bằng một hàm lambda. Trong ví dụ PySpark bên dưới, bạn trả về bình phương của nums. Đó là một sự chuyển đổi map.
1 2 3 |
squared = nums.map(lambda x: x*x).collect() for num in squared: print('%i ' % (num)) |
1 2 3 4 |
1 4 9 16 |
SQLContext
Một cách thuận tiện hơn là sử dụng DataFrame. SparkContext đã được thiết lập, bạn có thể sử dụng nó để tạo dataFrame. Bạn cũng cần phải khai báo SQLContext.
SQLContext cho phép kết nối engine với các nguồn dữ liệu khác nhau. Nó được sử dụng để khởi tạo các hàm của Spark SQL.
1 2 3 4 |
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc) |
Bây giờ, hãy tạo một danh sách các tuple. Mỗi tuple sẽ chứa tên của mọi người và tuổi của họ. Bốn bước được yêu cầu:
Bước 1) Tạo danh sách các tuple với thông tin.
1 |
[('John',19),('Smith',29),('Adam',35),('Henry',50)] |
Bước 2) Xây dựng RDD
1 |
rdd = sc.parallelize(list_p) |
Bước 3) Chuyển đổi các tuple
1 |
rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) |
Bước 4) Tạo DataFrame context
1 2 3 4 5 |
sqlContext.createDataFrame(ppl) list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)] rdd = sc.parallelize(list_p) ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) DF_ppl = sqlContext.createDataFrame(ppl) |
Nếu bạn muốn truy cập type của từng đặc trưng, bạn có thể sử dụng printSchema().
1 2 3 4 |
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true) |
Ví dụ về học máy với PySpark
Bây giờ bạn đã có ý tưởng ngắn gọn về Spark và SQLContext, bạn đã sẵn sàng xây dựng chương trình Máy học đầu tiên của mình.
Sau đây là các bước để xây dựng một chương trình Học máy với PySpark:
- Bước 1) Hoạt động cơ bản với PySpark
- Bước 2) Tiền xử lý dữ liệu
- Bước 3) Xây dựng pipeline xử lý dữ liệu
- Bước 4) Xây dựng bộ phân loại: logistic
- Bước 5) Đào tạo và đánh giá mô hình
- Bước 6) Điều chỉnh siêu tham số
chúng ta sẽ sử dụng tập dữ liệu adult dataset. Mục đích của hướng dẫn này là để học cách sử dụng Pyspark.
Bước 1) Hoạt động cơ bản với PySpark
Trước hết, bạn cần khởi tạo SQLContext.
1 2 3 4 5 |
#from pyspark.sql import SQLContext url = "data csv của bạn" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc) |
Sau đó, bạn có thể đọc tệp cvs bằng sqlContext.read.csv. Sử dụng inferSchema được đặt thành True để yêu cầu Spark tự động đoán loại dữ liệu. Theo mặc định, nó chuyển thành False.
1 |
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True) |
Hãy xem kiểu dữ liệu
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |
Bạn có thể xem dữ liệu với show.
1 |
df.show(5, truncate = False) |
1 2 3 4 5 6 7 8 9 10 |
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows |
Nếu bạn không đặt inderShema thành True, đây là những gì đang xảy ra với type. Có tất cả trong chuỗi.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False) df_string.printSchema() root |-- age: string (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: string (nullable = true) |-- education: string (nullable = true) |-- education_num: string (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: string (nullable = true) |-- capital_loss: string (nullable = true) |-- hours_week: string (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |
Để chuyển đổi biến liên tục theo đúng định dạng, bạn có thể sử dụng các cột. Bạn có thể sử dụng withColumn để cho Spark biết cột nào sẽ hoạt động chuyển đổi.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# Import all from `sql.types` from pyspark.sql.types import * # Write a custom function to convert the data type of DataFrame columns def convertColumn(df, names, newType): for name in names: df = df.withColumn(name, df[name].cast(newType)) return df # List of continuous features CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week'] # Convert the type df_string = convertColumn(df_string, CONTI_FEATURES, FloatType()) # Check the dataset df_string.printSchema() root |-- age: float (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: float (nullable = true) |-- education: string (nullable = true) |-- education_num: float (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: float (nullable = true) |-- capital_loss: float (nullable = true) |-- hours_week: float (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) from pyspark.ml.feature import StringIndexer #stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel") #model = stringIndexer.fit(df) #df = model.transform(df) df.printSchema() |
Select columns
Bạn có thể chọn và hiển thị các hàng có lựa chọn và tên của các đặc trưng. Dưới đây, age và fnlwgt được chọn.
1 |
df.select('age','fnlwgt').show(5) |
1 2 3 4 5 6 7 8 9 10 |
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows |
Count by group
Nếu bạn muốn đếm số lần xuất hiện theo nhóm, bạn có thể xâu chuỗi:
- groupBy()
- count()
Trong ví dụ PySpark bên dưới, bạn đếm số hàng theo education level.
1 |
df.groupBy("education").count().sort("count",ascending=True).show() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+ |
Describe the data
Để nhận thống kê tóm tắt về dữ liệu, bạn có thể sử dụng description():
- count
- mean
- standarddeviation
- min
- max
1 |
df.describe().show() |
1 2 3 4 5 6 7 8 9 |
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |
Nếu bạn muốn thống kê tóm tắt chỉ của một cột, hãy thêm tên của cột vào bên trong description().
1 |
df.describe('capital_gain').show() |
1 2 3 4 5 6 7 8 9 |
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+ |
Crosstab computation
Trong một số trường hợp, có thể thú vị khi xem các thống kê mô tả giữa hai cột theo cặp. Ví dụ: bạn có thể đếm số người có thu nhập dưới hoặc trên 50k theo trình độ học vấn. Thao tác này được gọi là crosstab.
1 |
df.crosstab('age', 'label').sort("age_label").show() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows |
Bạn có thể thấy không có người nào có doanh thu trên 50k khi họ còn trẻ.
Drop column
Có hai API trực quan để drop columns:
- drop(): Drop a column
- dropna(): Drop NA’s
Bên dưới bạn drop column education_num
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
df.drop('education_num').columns ['age', 'workclass', 'fnlwgt', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] |
Filter data
Bạn có thể sử dụng filter () để áp dụng thống kê mô tả trong một tập hợp con dữ liệu. Ví dụ: bạn có thể đếm số người trên 40 tuổi
1 |
df.filter(df.age > 40).count() |
1 |
13443 |
Thống kê mô tả theo nhóm
Cuối cùng, bạn có thể nhóm dữ liệu theo nhóm và tính toán các hoạt động thống kê như giá trị trung bình.
1 |
df.groupby('marital').agg({'capital_gain': 'mean'}).show() |
1 2 3 4 5 6 7 8 9 10 11 |
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+ |
Bước 2) Tiền xử lý dữ liệu
Xử lý dữ liệu là một bước quan trọng trong học máy. Sau khi xóa dữ liệu rác, bạn sẽ có được một số thông tin chi tiết quan trọng.
Ví dụ, bạn biết rằng tuổi không phải là một hàm tuyến tính với thu nhập. Khi còn trẻ, thu nhập của họ thường thấp hơn tuổi trung niên. Sau khi nghỉ hưu, một hộ gia đình sử dụng tiền tiết kiệm của họ, nghĩa là thu nhập giảm. Để chụp mẫu này, bạn có thể thêm square vào đặc trưng tuổi.
Add age square
Để thêm một đặc trưng mới, bạn cần:
- Chọn cột
- Áp dụng phép biến đổi và thêm nó vào DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
from pyspark.sql.functions import * # 1 Select the column age_square = df.select(col("age")**2) # 2 Apply the transformation and add it to the DataFrame df = df.withColumn("age_square", col("age")**2) df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true) |-- age_square: double (nullable = true) |
Bạn có thể thấy rằng age_square đã được thêm thành công vào khung dữ liệu. Bạn có thể thay đổi thứ tự của các biến với select. Dưới đây, bạn mang theo age_square ngay sau tuổi.
1 2 3 4 5 |
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label'] df = df.select(COLUMNS) df.first() |
1 |
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K') |
Loại trừ Holand-Netherlands
Khi một nhóm trong một đặc trưng chỉ có một dữ liệu, nó không mang lại thông tin gì cho mô hình. Ngược lại, nó có thể dẫn đến lỗi trong quá trình cross-validation.
Hãy kiểm tra nguồn gốc của hộ.
1 2 |
df.filter(df.native_country == 'Holand-Netherlands').count() df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows |
Đặc trưng native_country chỉ có một hộ gia đình đến từ Hà Lan. Bạn loại trừ nó.
1 |
df_remove = df.filter(df.native_country != 'Holand-Netherlands') |
Bước 3) Xây dựng pipeline xử lý dữ liệu
Tương tự như scikit-learn, Pyspark có API pipeline.
Một pipeline dẫn rất thuận tiện để duy trì cấu trúc của dữ liệu. Bạn đẩy dữ liệu vào pipeline. Bên trong pipeline, các hoạt động khác nhau được thực hiện, đầu ra được sử dụng để cung cấp cho thuật toán.
Ví dụ: một phép biến đổi phổ quát trong học máy bao gồm chuyển đổi một chuỗi thành một one hot encoder, tức là một cột theo nhóm. One hot encoder thường là một ma trận đầy các số 0.
Các bước để biến đổi dữ liệu rất giống với scikit-learn. Bạn cần phải:
- Lập index chuỗi thành số
- Tạo một bộ one hot encoder
- Chuyển đổi dữ liệu
Hai API thực hiện công việc: StringIndexer, OneHotEncoder
- Trước hết, bạn chọn cột chuỗi để lập chỉ mục. InputCol là tên của cột trong tập dữ liệu. OutputCol là tên mới được đặt cho cột được chuyển đổi.
1StringIndexer(inputCol="workclass", outputCol="workclass_encoded") - Điều chỉnh dữ liệu và biến đổi nó
12model = stringIndexer.fit(df)`indexed = model.transform(df)`` - Tạo các cột news dựa trên nhóm. Ví dụ: nếu có 10 nhóm trong đặc trưng, ma trận mới sẽ có 10 cột, mỗi nhóm một cột.
1OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
123456789### Example encoderfrom pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssemblerstringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")model = stringIndexer.fit(df)indexed = model.transform(df)encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")encoded = encoder.transform(indexed)encoded.show(2)
1234567+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+|age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+| 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])|| 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])|+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+only showing top 2 rows
Xây dựng pipeline
Bạn sẽ xây dựng một pipeline để chuyển đổi tất cả các đặc trưng chính xác và thêm chúng vào tập dữ liệu cuối cùng. Pipeline sẽ có bốn hoạt động, nhưng hãy thoải mái thêm bao nhiêu hoạt động tùy thích.
- Encode dữ liệu phân loại
- Lập Index label feature
- Thêm biến liên tục
- Tập hợp các bước.
Mỗi bước được lưu trữ trong một danh sách có tên các giai đoạn. Danh sách này sẽ cho VectorAssembler biết thao tác nào cần thực hiện bên trong pipeline.
Mã hóa dữ liệu phân loại
Bước này cũng giống như ví dụ trên, ngoại trừ việc bạn lặp lại tất cả các đặc trưng phân loại.
1 2 3 4 5 6 7 8 9 |
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder] |
Lập index label feature
Spark, giống như nhiều thư viện khác, không chấp nhận các giá trị chuỗi cho nhãn. Bạn chuyển đổi đặc trưng nhãn với StringIndexer và thêm nó vào các giai đoạn danh sách.
1 2 3 |
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx] |
Thêm biến liên tục
InputCols của VectorAssembler là một danh sách các cột. Bạn có thể tạo một danh sách mới chứa tất cả các cột mới.
1 |
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES |
Tập hợp các bước
Cuối cùng, bạn vượt qua tất cả các bước trong VectorAssembler
1 |
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler] |
Bây giờ tất cả các bước đã sẵn sàng, bạn đẩy dữ liệu vào pipeline.
1 2 3 4 |
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove) |
Nếu bạn kiểm tra tập dữ liệu mới, bạn có thể thấy rằng nó chứa tất cả các đặc trưng, được chuyển đổi và chưa được chuyển đổi. Bạn chỉ quan tâm đến nhãn mới và các đặc trưng.
1 2 3 |
model.take(1) [Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))] |
Bước 4) Xây dựng bộ phân loại: logistic
Để tính toán nhanh hơn, bạn chuyển đổi mô hình thành DataFrame.
Bạn cần chọn nhãn mới và các đặc trưng từ mô hình bằng cách sử dụng map.
1 2 |
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) |
Bạn đã sẵn sàng tạo dữ liệu train dưới dạng DataFrame. Sử dụng sqlContext
1 |
df_train = sqlContext.createDataFrame(input_data, ["label", "features"]) |
Kiểm tra hàng thứ hai
1 |
df_train.show(2) |
1 2 3 4 5 6 7 |
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows |
Tạo train/test set
Bạn chia tập dữ liệu 80/20 với randomSplit.
1 2 |
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234) |
Hãy đếm xem có bao nhiêu người có thu nhập dưới / trên 50k trong cả tập huấn luyện và kiểm tra.
1 |
train_data.groupby('label').agg({'label': 'count'}).show() |
1 2 3 4 5 6 |
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+ |
1 |
test_data.groupby('label').agg({'label': 'count'}).show() |
1 2 3 4 5 6 |
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+ |
Xây dựng bộ hồi quy logistic
Cuối cùng nhưng không kém phần quan trọng, bạn có thể xây dựng bộ phân loại. Pyspark có một API gọi là LogisticRegression để thực hiện hồi quy logistic.
Bạn khởi tạo lr bằng cách chỉ ra cột nhãn và các cột đặc trưng. Đặt tối đa 10 lần lặp và thêm thông số chính quy hóa với giá trị 0,3. Lưu ý rằng trong phần tiếp theo, bạn sẽ sử dụng xác thực chéo với lưới tham số để điều chỉnh mô hình.
1 2 3 4 5 6 7 8 9 10 11 |
# Import `LinearRegression` from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data) |
# Bạn có thể xem các hệ số từ hồi quy
1 2 3 |
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(linearModel.coefficients)) print("Intercept: " + str(linearModel.intercept)) |
1 2 |
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692 |
Bước 5) Đào tạo và đánh giá mô hình
Để tạo dự đoán cho bộ thử Bạn cần phải xem chỉ số độ chính xác để xem mô hình hoạt động tốt (hoặc xấu) như thế nào.nghiệm của bạn. Bạn có thể sử dụng linearModel với transform() trên test_data.
1 2 |
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data) |
Bạn có thể in các phần tử trong dự đoán
1 2 3 4 5 6 7 |
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false) |
Bạn quan tâm đến nhãn, dự đoán và xác suất
1 2 |
selected = predictions.select("label", "prediction", "probability") selected.show(20) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows |
Đánh giá mô hình
Bạn cần phải xem chỉ số độ chính xác để xem mô hình hoạt động tốt (hoặc xấu) như thế nào. Hiện tại, không có API nào để tính toán độ chính xác trong Spark. Giá trị mặc định là ROC (receiver operating characteristic curve).
Trước khi bạn xem xét ROC, hãy xây dựng thước đo độ chính xác. Thước đo độ chính xác là tổng của dự đoán đúng trên tổng số quan sát.
Bạn tạo một DataFrame với nhãn và dự đoán
1 |
cm = predictions.select("label", "prediction") |
Bạn có thể kiểm tra số lượng lớp trong nhãn và dự đoán
1 |
cm.groupby('label').agg({'label': 'count'}).show() |
1 2 3 4 5 6 |
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+ |
1 |
cm.groupby('prediction').agg({'prediction': 'count'}).show() |
1 2 3 4 5 6 |
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+ |
Ví dụ, trong tập thử nghiệm, có 1578 hộ gia đình có thu nhập trên 50k và 5021 hộ dưới. Tuy nhiên, phân loại dự đoán 617 hộ gia đình có thu nhập trên 50 nghìn.
Bạn có thể tính độ chính xác bằng cách tính số lượng khi nhãn được phân loại chính xác trên tổng số hàng.
1 |
cm.filter(cm.label == cm.prediction).count() / cm.count() |
1 |
0.8237611759357478 |
Bạn có thể kết hợp mọi thứ lại với nhau và viết một hàm để tính độ chính xác.
1 2 3 4 5 6 7 |
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("label", "prediction") acc = cm.filter(cm.label == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel) Model accuracy: 82.376% |
ROC metrics
Mô-đun BinaryClassificationEvaluator bao gồm các biện pháp ROC. Receiver Operating Characteristic curve là một công cụ phổ biến khác được sử dụng với phân loại nhị phân. Nó rất giống với precision/recall nhưng thay vì vẽ biểu đồ precision so với recall. ROC cho thấy tỷ lệ dương tính thực sự (tức là recall) so với tỷ lệ dương tính giả.Tỷ lệ dương tính giả là tỷ lệ các trường hợp tiêu cực được phân loại không chính xác là dương tính. Tỷ lệ âm thực sự còn được gọi là độ đặc hiệu. Do đó, đường cong ROC biểu thị độ nhạy (recall) so với 1 – độ đặc hiệu.
1 2 3 4 5 6 7 |
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName()) |
1 |
0.8940481662695192areaUnderROC |
1 |
print(evaluator.evaluate(predictions)) |
1 |
0.8940481662695192 |
Bước 6) Điều chỉnh siêu tham số
Cuối cùng nhưng không kém phần quan trọng, bạn có thể điều chỉnh các siêu tham số.
Để giảm thời gian tính toán, bạn chỉ điều chỉnh tham số chính quy chỉ với hai giá trị.
1 2 3 4 5 6 |
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build()) |
Cuối cùng, bạn đánh giá mô hình bằng cách sử dụng phương pháp cross valiation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from time import * start_time = time() # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) # likely take a fair amount of time end_time = time() elapsed_time = end_time - start_time print("Time to train model: %.3f seconds" % elapsed_time) |
Thời gian đào tạo mô hình: 978.807 giây
Siêu tham số đo chính quy tốt nhất là 0,01, với độ chính xác 85,316 phần trăm.
1 2 |
accuracy_m(model = cvModel) Model accuracy: 85.316% |
Bạn có thể loại trừ tham số được đề xuất bằng cách chaining cvModel.bestModel với extractParamMap().
1 2 |
bestModel = cvModel.bestModel bestModel.extractParamMap() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06} |
Kết Luận
Spark là một công cụ cơ bản cho một nhà khoa học dữ liệu. Nó cho phép kết nối ứng dụng với các nguồn dữ liệu khác nhau, thực hiện phân tích dữ liệu một cách liền mạch hoặc thêm mô hình dự đoán.
Để bắt đầu với Spark, bạn cần bắt đầu Spark Context với:
`SparkContext()“
Và SQL context để kết nối với nguồn dữ liệu:
`SQLContext()“
Trong bài viết này, chúng ta đã học cách huấn luyện hồi quy logistic:
Chuyển đổi tập dữ liệu thành Dataframe với:
1 2 |
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"]) |
Tạo train/test set
1 |
randomSplit([.8,.2],seed=1234) |
Đào tạo mô hình
1 |
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3) |
1 |
lr.fit() |
Đưa ra dự đoán
1 |
linearModel.transform() |
Bài viết tiếp theo: Cách dùng Scikit-Learn – Machine Learning bằng Python