PySpark实战:从数据清洗到模型部署的泰坦尼克号幸存者预测完整流程

1. PySpark环境搭建与数据准备

第一次接触PySpark时,我被它处理海量数据的能力震撼到了。记得当时用传统Pandas处理一个2GB的CSV文件,内存直接爆掉,而PySpark轻松搞定。下面分享下我是如何搭建环境的,以及处理泰坦尼克号数据集的实战经验。

PySpark环境配置其实比想象中简单。我习惯用conda创建独立环境,避免包冲突:

conda create -n pyspark_env python=3.8 conda activate pyspark_env pip install pyspark==3.3.1 findspark jupyterlab

安装完成后,在Jupyter Notebook中初始化SparkSession时有个小技巧:设置spark.driver.memory可以避免内存不足的问题。我通常会这样配置:

from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Titanic_Analysis") \ .config("spark.driver.memory", "4g") \ .getOrCreate()

泰坦尼克号数据集可以从Kaggle下载,我习惯把数据放在项目目录的data文件夹下。加载数据时发现几个常见坑点:

  • 必须指定header=True,否则第一行会被当作数据
  • inferSchema=True能自动推断数据类型,但会稍微影响性能
  • 添加.cache()能提升后续重复操作的效率
df = spark.read.csv("data/titanic.csv", header=True, inferSchema=True).cache()

2. 数据探索与可视化分析

数据探索就像侦探破案,每个线索都可能影响最终结果。我习惯先用describe()快速查看数值特征:

df.describe(["Age", "Fare", "Pclass"]).show()

输出结果会显示计数、均值、标准差等关键指标。这里发现Age有缺失值,后面需要处理。为了更直观,我常用PySpark结合Pandas做可视化:

import matplotlib.pyplot as plt # 幸存者性别分布 gender_survival = df.groupBy("Sex", "Survived").count().toPandas() gender_survival.pivot(index="Sex", columns="Survived", values="count").plot(kind="bar") plt.title("Survival by Gender")

通过分析发现几个有趣现象:

  1. 女性幸存率显著高于男性(约74% vs 19%)
  2. 头等舱乘客幸存率更高
  3. 儿童(Age<12)幸存率优于其他年龄段

这些发现将直接影响后续的特征工程策略。比如性别和舱位等级明显是强特征,而年龄可能需要分箱处理。

3. 数据清洗与特征工程

数据清洗是最耗时但最关键的环节。针对泰坦尼克号数据,我总结了以下处理步骤:

缺失值处理:

  • Age用中位数填充(比均值更抗异常值)
  • Embarked用众数'S'填充
  • Cabin字段缺失太多直接删除
from pyspark.sql.functions import median median_age = df.select(median("Age")).collect()[0][0] df = df.fillna({"Age": median_age, "Embarked": "S"}) df = df.drop("Cabin")

特征转换:

  1. 性别转为数值(StringIndexer)
  2. 登船港口做OneHot编码
  3. 票价做对数变换处理偏态
from pyspark.ml.feature import StringIndexer, OneHotEncoder sex_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex") embarked_indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex") encoder = OneHotEncoder(inputCols=["EmbarkedIndex"], outputCols=["EmbarkedVec"])

特征构造:

  1. 家庭规模 = SibSp + Parch
  2. 姓名中提取称谓(Mr/Mrs/Miss等)
  3. 年龄分箱(儿童/青年/中年/老年)
from pyspark.sql.functions import udf from pyspark.sql.types import StringType def extract_title(name): return name.split(",")[1].split(".")[0].strip() title_udf = udf(extract_title, StringType()) df = df.withColumn("Title", title_udf(df["Name"]))

4. 构建机器学习Pipeline

PySpark的Pipeline让整个流程像流水线一样清晰。我通常按这个顺序构建:

  1. 数据准备阶段:字符串索引、OneHot编码、特征缩放
  2. 特征组合:VectorAssembler合并所有特征
  3. 模型训练:添加分类器
from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import LogisticRegression # 定义特征列 feature_cols = ["Pclass", "SexIndex", "Age", "Fare", "EmbarkedVec"] # 创建流水线 assembler = VectorAssembler(inputCols=feature_cols, outputCol="features") lr = LogisticRegression(featuresCol="features", labelCol="Survived") pipeline = Pipeline(stages=[sex_indexer, embarked_indexer, encoder, assembler, lr])

训练时有个实用技巧:先用小样本测试管道是否畅通,再全量训练:

sample_data = df.sample(0.1) model = pipeline.fit(sample_data)

5. 模型训练与评估

我通常会对比逻辑回归和决策树两种模型。逻辑回归训练速度快,决策树更易解释。

逻辑回归实现:

# 划分训练测试集 train, test = df.randomSplit([0.8, 0.2], seed=42) # 训练模型 lr_model = pipeline.fit(train) # 评估 lr_predictions = lr_model.transform(test) from pyspark.ml.evaluation import BinaryClassificationEvaluator evaluator = BinaryClassificationEvaluator(labelCol="Survived") print("LR AUC:", evaluator.evaluate(lr_predictions))

决策树实现:

from pyspark.ml.classification import DecisionTreeClassifier dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features") dt_pipeline = Pipeline(stages=[sex_indexer, embarked_indexer, encoder, assembler, dt]) dt_model = dt_pipeline.fit(train) dt_predictions = dt_model.transform(test) print("DT AUC:", evaluator.evaluate(dt_predictions))

评估时除了AUC,我还会看混淆矩阵:

from pyspark.mllib.evaluation import MulticlassMetrics predictionAndLabels = lr_predictions.select("prediction", "Survived").rdd metrics = MulticlassMetrics(predictionAndLabels) print("Confusion Matrix:", metrics.confusionMatrix().toArray())

6. 模型优化与调参

模型第一次结果往往不理想,需要调参。PySpark的CrossValidator非常实用:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.1, 1.0]) .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) .build()) cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) cv_model = cv.fit(train)

调参后模型AUC从0.78提升到了0.83。几个关键发现:

  1. 适度的L2正则化(regParam=0.1)效果最好
  2. 年龄分箱比原始年龄特征效果更好
  3. 家庭规模特征贡献度很高

7. 模型部署与生产化

训练好的模型需要持久化以便复用:

lr_model.write().overwrite().save("models/titanic_lr")

加载模型进行预测的完整流程:

from pyspark.ml import PipelineModel saved_model = PipelineModel.load("models/titanic_lr") new_data = spark.createDataFrame([ (3, "male", 22.0, 1, 0, 7.25, "S")], ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare", "Embarked"]) predictions = saved_model.transform(new_data) predictions.select("prediction").show()

在生产环境中,我推荐:

  1. 使用mlflow跟踪实验
  2. 定期用新数据重新训练模型
  3. 监控模型性能衰减

8. 项目复盘与经验总结

通过这个项目,我总结了PySpark机器学习的最佳实践:

  1. 数据预处理:占整个项目70%时间,但值得投入
  2. 特征工程:领域知识比算法选择更重要
  3. 模型选择:从简单模型开始,逐步复杂化
  4. 评估指标:选择符合业务目标的指标

常见踩坑点:

  • 忘记.cache()导致重复计算
  • 类别不平衡时没设置classWeights
  • 测试集泄露到训练数据

最后分享一个实用技巧:使用explain()方法查看Spark执行计划,能发现性能瓶颈:

df.filter(df.Age > 30).select("Survived").explain()