Software Development

Predicting Breast Cancer Using Apache Spark Machine Learning Logistic Regression

In this blog post, I’ll help you get started using Apache Spark’s Logistic Regression for predicting cancer malignancy.
Spark’s library goal is to provide a set of APIs on top of DataFrames that help users create and tune machine learning workflows or pipelines. Using with DataFrames improves performance through intelligent optimizations.


Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (for example, whether a cancer tissue observation is malignant or not), based on labeled examples of known items (for example, observations known to be malignant or not). Classification takes a set of data with known labels and pre-determined features and learns how to label new records based on that information. Features are the “if questions” that you ask. The label is the answer to those questions. In the example below, if it walks, swims, and quacks like a duck, then the label is “duck.”


Let’s go through an example of Cancer Tissue Observations:

  • What are we trying to predict?
    • Whether a sample observation is malignant or not.
    • This is the Label: malignant or not.
  • What are the “if questions” or properties that you can use to predict?
    • Tissue sample characteristics: Clump Thickness, Uniformity of Cell Size, Uniformity of Cell Shape, Marginal Adhesion, Single Epithelial Cell Size, Bare Nuclei, Bland Chromatin, Normal Nucleoli, Mitoses.
    • These are the Features. To build a classifier model, you extract the features of interest that most contribute to the classification.

Logistic Regression

Logistic regression is a popular method to predict a binary response. It is a special case of Generalized Linear models that predicts the probability of the outcome. Logistic regression measures the relationship between the Y “Label” and the X “Features” by estimating probabilities using a logistic function. The model predicts a probability which is used to predict the label class.


Analyze Cancer Observations with Spark Machine Learning Scenario

Our data is from the Wisconsin Diagnostic Breast Cancer (WDBC) Data Set which categorizes breast tumor cases as either benign or malignant based on 9 features to predict the diagnosis. For each cancer observation, we have the following information:

1. Sample code number: id number 
2. Clump Thickness: 1 - 10 
3. Uniformity of Cell Size: 1 - 10 
4. Uniformity of Cell Shape: 1 - 10 
5. Marginal Adhesion: 1 - 10 
6. Single Epithelial Cell Size: 1 - 10 
7. Bare Nuclei: 1 - 10 
8. Bland Chromatin: 1 - 10 
9. Normal Nucleoli: 1 - 10 
10. Mitoses: 1 - 10 
11. Class: (2 for benign, 4 for malignant)

The Cancer Observation csv file has the following format :


In this scenario, we will build a logistic regression model to predict the label / classification of malignant or not based on the following features:

  • Label → malignant or benign (1 or 0)
  • Features → {Clump Thickness, Uniformity of Cell Size, Uniformity of Cell Shape, Marginal Adhesion, Single Epithelial Cell Size, Bare Nuclei, Bland Chromatin, Normal Nucleoli, Mitoses }

Spark ML provides a uniform set of high-level APIs built on top of DataFrames. The main concepts in Spark ML are:

  • DataFrame: The ML API uses DataFrames from Spark SQL as an ML dataset.
  • Transformer: A Transformer is an algorithm which transforms one DataFrame into another DataFrame. For example, turning a DataFrame with features into a DataFrame with predictions.
  • Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. For example, training/tuning on a DataFrame and producing a model.
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify a ML workflow.
  • ParamMaps: Parameters to choose from, sometimes called a “parameter grid” to search over.
  • Evaluator: Metric to measure how well a fitted Model does on held-out test data.
  • CrossValidator: Identifies the best ParamMap and re-fits the Estimator using the best ParamMap and the entire dataset.

In this example, will use the Spark ML workflow shown below:bcmlprocess


This tutorial will run on Spark 1.6.1

Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data file to your sandbox home directory /user/user01 using scp. (Note you may have to update the Spark version on you Sandbox) Start the Spark shell with:

$spark-shell --master local[1]

Load and Parse the Data from a csv File

First, we will import the machine learning packages.
(In the code boxes, comments are in Green and output is in Blue)

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import sqlContext.implicits._
import sqlContext._
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

We use a Scala case class to define the schema corresponding to a line in the csv data file.

// define the Cancer Observation Schema
case class Obs(clas: Double, thickness: Double, size: Double, shape: Double, madh: Double, epsize: Double, bnuc: Double, bchrom: Double, nNuc: Double, mit: Double)

The functions below parse a line from the data file into the Cancer Observation class.

// function to create a Obs class from an Array of Double.Class Malignant 4 is changed to 1
def parseObs(line: Array[Double]): Obs = {
      if (line(9) == 4.0) 1 else 0, line(0), line(1), line(2), line(3), line(4), line(5), line(6), line(7), line(8)
// function to transform an RDD of Strings into an RDD of Double, filter lines with ?, remove first column
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {",")).filter(_(6) != "?").map(_.drop(1)).map(

Below we load the data from the csv file into an RDD of Strings. Then we use the map transformation on the rdd, which will apply the ParseRDD function to transform each String element in the RDD into an Array of Double. Then we use another map transformation, which will apply the ParseObs function to transform each Array of Double in the RDD into an Array of Cancer Observation objects. The toDF() method transforms the RDD of Array[[Cancer Observation]] into a Dataframe with the Cancer Observation class schema.


// load the data into a DataFrame
val rdd = sc.textFile("data/breast_cancer_wisconsin_data.txt")
val obsRDD = parseRDD(rdd).map(parseObs)
val obsDF = obsRDD.toDF().cache()

DataFrame printSchema() Prints the schema to the console in a tree format

// Return the schema of this DataFrame

 |-- clas: double (nullable = false)
 |-- thickness: double (nullable = false)
 |-- size: double (nullable = false)
 |-- shape: double (nullable = false)
 |-- madh: double (nullable = false)
 |-- epsize: double (nullable = false)
 |-- bnuc: double (nullable = false)
 |-- bchrom: double (nullable = false)
 |-- nNuc: double (nullable = false)
 |-- mit: double (nullable = false)

// Display the top 20 rows of DataFrame

| 0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|
| 0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|
| 0.0|      3.0| 1.0|  1.0| 1.0|   2.0| 2.0|   3.0| 1.0|1.0|
only showing top 20 rows

After a DataFrame is instantiated, you can query it using SQL queries. Here are some example queries using the Scala DataFrame API:

describe computes statistics for thickness column, including count, mean, stddev, min, and max

//  computes statistics for thickness 

|summary|         thickness|
|  count|               683|
|   mean|  4.44216691068814|
| stddev|2.8207613188371288|
|    min|               1.0|
|    max|              10.0|

// compute the avg thickness, size, shape grouped by clas (malignant or not) 
sqlContext.sql("SELECT clas, avg(thickness) as avgthickness, avg(size) as avgsize, avg(shape) as avgshape FROM obs GROUP BY clas ").show

|clas|     avgthickness|           avgsize|          avgshape|
| 1.0|7.188284518828452| 6.577405857740586| 6.560669456066946|
| 0.0|2.963963963963964|1.3063063063063063|1.4144144144144144|
// compute avg thickness grouped by clas (malignant or not) 

|clas|   avg(thickness)|
| 1.0|7.188284518828452|
| 0.0|2.963963963963964|

Extract Features

To build a classifier model, you first extract the features that most contribute to the classification. In the cancer data set, the data is labeled with two classes – 1 (malignant) and 0 (not malignant).

The features for each item consists of the fields shown below:

  • Label → malignant: 0 or 1
  • Features → {“thickness”, “size”, “shape”, “madh”, “epsize”, “bnuc”, “bchrom”, “nNuc”, “mit”}

Define Features Array


(reference Learning Spark)

In order for the features to be used by a machine learning algorithm, the features are transformed and put into Feature Vectors, which are vectors of numbers representing the value for each feature.

Below a VectorAssembler is used to transform and return a new DataFrame with all of the feature columns in a vector column


//define the feature columns to put in the feature vector
val featureCols = Array("thickness", "size", "shape", "madh", "epsize", "bnuc", "bchrom", "nNuc", "mit")

//set the input and output column names
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
//return a dataframe with all of the  feature columns in  a vector column
val df2 = assembler.transform(obsDF)
// the transform method produced a new column: features.

|clas|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|
| 0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[5.0,1.0,1.0,1.0,...|
| 0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|[5.0,4.0,4.0,5.0,...|
| 1.0|      8.0|10.0| 10.0| 8.0|   7.0|10.0|   9.0| 7.0|1.0|[8.0,10.0,10.0,8....|

Next, we use a StringIndexer to return a Dataframe with the clas (malignant or not) column added as a label .


//  Create a label column with the StringIndexer  
val labelIndexer = new StringIndexer().setInputCol("clas").setOutputCol("label")
val df3 =
// the  transform method produced a new column: label.

|clas|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|label|
| 0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[5.0,1.0,1.0,1.0,...|  0.0|
| 0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|[5.0,4.0,4.0,5.0,...|  0.0|
| 0.0|      3.0| 1.0|  1.0| 1.0|   2.0| 2.0|   3.0| 1.0|1.0|[3.0,1.0,1.0,1.0,...|  0.0|

Below the data. It is split into a training data set and a test data set. 70% of the data is used to train the model, and 30% will be used for testing.

//  split the dataframe into training and test data
val splitSeed = 5043 
val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)

Train the Model


Next, we train the logistic regression model with elastic net regularization

The model is trained by making associations between the input features and the labeled output associated with those features.


// create the classifier,  set parameters for training
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
//  use logistic regression to train (fit) the model with the training data
val model =    

// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${model.coefficients} Intercept: ${model.intercept}")

Coefficients: (9,[1,2,5,6],[0.06503554553146387,0.07181362361391264,0.07583963853124673,0.0012675057388232965]) Intercept: -1.39319142312609

Test the Model

Next we use the test data to get predictions.


// run the  model on test features to get predictions
val predictions = model.transform(testData) 
//As you can see, the previous model transform produced a new columns: rawPrediction, probablity and prediction.

|clas|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|label|       rawPrediction|         probability|prediction|
| 0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   1.0| 3.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17923510971064...|[0.76481024658406...|       0.0|
| 0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17670009823299...|[0.76435395397908...|       0.0|
| 0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17670009823299...|[0.76435395397908...|       0.0|
| 0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17923510971064...|[0.76481024658406...|       0.0|
| 0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17796760397182...|[0.76458217679258...|       0.0|

Below we evaluate the predictions, we use a BinaryClassificationEvaluator which returns a precision metric by comparing the test label column with the test prediction column. In this case, the evaluation returns 99% precision.


//A common metric used for logistic regression is area under the ROC curve (AUC). We can use the BinaryClasssificationEvaluator to obtain the AUC 
// create an Evaluator for binary classification, which expects two input columns: rawPrediction and label.
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
// Evaluates predictions and returns a scalar metric areaUnderROC(larger is better). 
val accuracy = evaluator.evaluate(predictions) 
accuracy: Double = 0.9926910299003322

Below we calculate some more metrics. The number of false and true positive and negative predictions is also useful:

  • True positives are how often the model correctly predicted a tumour was malignant
  • False positives are how often the model predicted a tumour was malignant when it was benign
  • True negatives indicate how the model correctly predicted a tumour was benign
  • False negatives indicate how often the model predicted a tumour was benign when in fact it was malignant
// Calculate Metrics
val lp = "label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val truep = lp.filter($"prediction" === 0.0).filter($"label" === $"prediction").count()
val falseN = lp.filter($"prediction" === 0.0).filter(not($"label" === $"prediction")).count()
val falseP = lp.filter($"prediction" === 1.0).filter(not($"label" === $"prediction")).count()
val ratioWrong=wrong.toDouble/counttotal.toDouble
val ratioCorrect=correct.toDouble/counttotal.toDouble

counttotal: Long = 199
correct: Long = 168
wrong: Long = 31
truep: Long = 128
falseN: Long = 30
falseP: Long = 1
ratioWrong: Double = 0.15577889447236182
ratioCorrect: Double = 0.8442211055276382

// use MLlib to evaluate, convert DF to RDD
val  predictionAndLabels"rawPrediction", "label") => (x(0).asInstanceOf[DenseVector](1), x(1).asInstanceOf[Double]))
val metrics = new BinaryClassificationMetrics(predictionAndLabels) 
println("area under the precision-recall curve: " + metrics.areaUnderPR)
println("area under the receiver operating characteristic (ROC) curve : " + metrics.areaUnderROC)
// A Precision-Recall curve plots (precision, recall) points for different threshold values, while a receiver operating characteristic, or ROC, curve plots (recall, false positive rate) points. The closer  the area Under ROC is to 1, the better the model is making predictions.

area under the precision-recall curve: 0.9828385182615946
area under the receiver operating characteristic (ROC) curve : 0.9926910299003322

Want to learn more?

In this blog post, we showed you how to get started using Apache Spark’s machine learning Logistic Regression for classification. If you have any further questions about this tutorial, please ask them in the comments section below.

Notify of

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Inline Feedbacks
View all comments
Back to top button