-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_models_spark.py
131 lines (86 loc) · 4.51 KB
/
test_models_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# Initializing a Spark session
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.functions import when
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
spark = SparkSession.builder.master("local").appName("diabeties").config("spark.some.config.option",
"some-value").getOrCreate()
raw_data = spark.read.format("csv").option("header","true").option("inferSchema",
"true").load(r"file:///home/maria_dev/diabetes.csv")
raw_data=raw_data.withColumn("Glucose",
when(raw_data.Glucose==0,np.nan).otherwise(raw_data.Glucose))
raw_data=raw_data.withColumn("BloodPressure",
when(raw_data.BloodPressure==0,np.nan).otherwise(raw_data.BloodPressure))
raw_data=raw_data.withColumn("SkinThickness",
when(raw_data.SkinThickness==0,np.nan).otherwise(raw_data.SkinThickness))
raw_data=raw_data.withColumn("BMI",
when(raw_data.BMI==0,np.nan).otherwise(raw_data.BMI))
raw_data=raw_data.withColumn("Insulin",
when(raw_data.Insulin==0,np.nan).otherwise(raw_data.Insulin))
#raw_data.select("Insulin","Glucose","BloodPressure","SkinThickness","BMI").show(5)
imputer=Imputer(inputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"],
outputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"])
model = imputer.fit(raw_data)
raw_data = model.transform(raw_data)
#raw_data.show(5)
cols=raw_data.columns
cols.remove("Outcome")
# Let us import the vector assembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
raw_data=assembler.transform(raw_data)
raw_data.select("features").show(truncate=False)
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
raw_data=standardscaler.fit(raw_data).transform(raw_data)
#raw_data.select("features","Scaled_features").show(5)
train, test = raw_data.randomSplit([0.8, 0.2], seed=12345)
dataset_size=float(train.select("Outcome").count())
numPositives=train.select("Outcome").where('Outcome == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
#print('The number of ones are {}'.format(numPositives))
#print('Percentage of ones are {}'.format(per_ones))
BalancingRatio= numNegatives/dataset_size
train=train.withColumn("classWeights",
when(train.Outcome == 1,BalancingRatio).otherwise(1-BalancingRatio))
css = ChiSqSelector(featuresCol='Scaled_features',outputCol='Aspect',labelCol='Outcome',fpr=0.05)
train=css.fit(train).transform(train)
test=css.fit(test).transform(test)
lr = LogisticRegression(labelCol="Outcome", featuresCol="Aspect",weightCol="classWeights",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)
#predict_test.select("Outcome","prediction").show(10)
#This is the evaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="Outcome")
predict_test.select("Outcome","rawPrediction","prediction","probability").show(5)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("Test area under ROC {}".format(evaluator.evaluate(predict_test)))
#Modelo numero 2: DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Outcome", featuresCol="features")
dt_model = dt.fit(train)
dt_prediction = dt_model.transform(test)
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))
#Naive Bayes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol="Outcome",featuresCol="features")
nb_model = nb.fit(train)
nb_prediction = nb_model.transform(test)
nb_accuracy = evaluator.evaluate(nb_prediction)
print("Accuracy of Naive bayes is = %g"%(nb_accuracy))
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Outcome", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(train)
gbt_prediction = gbt_model.transform(test)
#gbt_prediction.select("prediction", "Survived", "features").show()
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie is = %g"% (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie %g"% (1.0 - gbt_accuracy))