返回

PySpark VectorAssembler: 避坑指南,确保向量化结果一致

python

PySpark VectorAssembler 的正确使用姿势

在处理机器学习任务时,特征向量化是一个基础且重要的步骤。PySpark 中的 VectorAssembler 提供了将多个数据列组合成单个向量的功能,简化了后续模型的输入准备。本文探讨在使用 VectorAssembler 时常见的问题以及对应的解决方案,以确保向量化操作的准确性。

问题分析:输出结果不一致

一个常见的困境是 VectorAssembler 的输出结果不一致,一些行正确向量化,另一些则表现为稀疏向量或者直接为列表,而非统一的稠密向量。此问题的核心在于输入的 DataFrame 格式和 VectorAssembler 对数据类型的处理。VectorAssembler 需要所有输入的列都是数值类型,且必须能够转换为 double 类型,若存在不匹配的数据类型或者非数值数据,VectorAssembler 便可能无法按照预期进行向量化。特别地,当某些列存在空值(例如null)时,会导致输出格式不统一。在给出的案例中,很可能某些行的某些列包含 null 值或其他导致无法统一向量化的值,从而产生了格式不一的向量输出。

解决方案一:数据清洗与类型转换

首要步骤是对输入 DataFrame 进行彻底的数据清洗。使用 PySpark 提供的函数对数据进行检查、清洗和转换。将所有输入的列转换为 double 类型是关键一步,并且对 null 值或 NaN 进行填充或者删除,是避免向量化结果不一致的有效方式。以下是步骤与示例代码:

步骤:

  1. 识别问题列: 首先使用 df.printSchema() 来检查 DataFrame 的数据类型,重点关注可能出现问题的列。
  2. 缺失值处理: 对所有包含数值类型的列使用 fillna(0) 来填充空值。如果你的业务场景需要其他填充值或选择直接丢弃包含缺失值的记录,请酌情进行调整。
  3. 数据类型转换: 显式地将输入列转换为 double 类型,确保 VectorAssembler 可以正确处理。使用 .withColumn("列名", df["列名"].cast("double")) 来执行类型转换。
  4. 再次进行向量化 :使用新的 DataFrame 进行向量化操作,得到预期一致的结果。
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# 初始化 SparkSession
spark = SparkSession.builder.appName("VectorAssemblerDemo").getOrCreate()

# 模拟你的DataFrame数据, 这里模拟数据和输入问题相似
data = [
    ("tag1", 1, 2, 3, 4),
    ("tag2", 5, None, 7, 8),
    ("tag3", 9, 10, 11, 12),
    ("tag4", None, 14, 15, 16),
    ("tag5", 17, 18, 19, 20)
]
columns = ["tag", "day1", "day2", "day3", "day4"]
df = spark.createDataFrame(data, columns)


# 打印Schema检查数据类型
df.printSchema()
# 清洗数据,先找出需要进行填充或者类型转换的列
numeric_columns = [col_name for col_name, col_type in df.dtypes if col_type in ['int','bigint','float', 'double'] ]
# 填充空值
for c in numeric_columns:
    df = df.fillna(0,subset=[c])
# 转换数据类型
for c in numeric_columns:
    df = df.withColumn(c,col(c).cast('double'))
# 使用VectorAssembler进行向量化
assembler = VectorAssembler(inputCols=numeric_columns, outputCol="vector")
output = assembler.transform(df)
output.select("vector").show(truncate=False)

#停止 sparkSession
spark.stop()

代码解读:

  1. 数据准备 : 我们构造了和案例中数据结构相似的数据,以便更好地解释该问题
  2. 打印Schema: 确保在做数据处理前清楚知道数据类型。
  3. 数据清洗 :通过遍历DataFrame的列信息,将列中包含null值的进行替换。确保每一个值都是可以转成Double类型的。
  4. 数据转换 : 确保所有的列数据都是 double 类型。如果之前有不同数据类型如 int , 也可以一次性通过这里转换。
  5. 向量化 : 利用VectorAssembler对目标列进行向量化。

注意事项:
在实际环境中,数据清洗过程可能更加复杂,需要根据具体的业务需求进行定制化处理,请根据实际数据的情况,选择不同的填充方式,而不是简单粗暴的选择 fillna(0).

解决方案二:使用 coalescefillna 进行预处理

除手动转换之外,可以利用 PySpark 函数在预处理阶段确保数据的一致性。coalescefillna 函数可用于替换空值或非数值,使之在数据输入到 VectorAssembler 之前更加可控,这是一种更为高效的方式。

步骤:

  1. 创建列列表: 明确需要作为输入的列。
  2. 应用 fillna 使用 fillna(0) 为输入列的所有缺失值赋予默认值。如果业务逻辑需要其他的默认值,请调整。
  3. 类型转换: 确认所有需要的输入列的类型为 double, 对于无法转化的列抛出异常,进行后续排查。
  4. 进行向量化 :使用数据清洗后的DataFrame进行向量化操作。
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# 初始化 SparkSession
spark = SparkSession.builder.appName("VectorAssemblerDemo2").getOrCreate()

# 模拟你的DataFrame数据, 这里模拟数据和输入问题相似
data = [
    ("tag1", 1, 2, 3, 4),
    ("tag2", 5, None, 7, 8),
    ("tag3", 9, 10, 11, 12),
    ("tag4", None, 14, 15, 16),
    ("tag5", 17, 18, 19, 20)
]
columns = ["tag", "day1", "day2", "day3", "day4"]
df = spark.createDataFrame(data, columns)

# 提取需要作为特征向量的列
input_columns = ["day1", "day2", "day3", "day4"]

# 处理缺失值,将所有输入列的null 值填充为0,并进行数据类型转化
for c in input_columns:
   df= df.withColumn(c,col(c).cast("double"))
   df = df.fillna(0, subset=[c])



# 创建VectorAssembler实例并执行向量化
assembler = VectorAssembler(inputCols=input_columns, outputCol="vector")
output = assembler.transform(df)

# 查看向量化的结果
output.select("vector").show(truncate=False)

#停止 sparkSession
spark.stop()

代码解读:

  1. 提取特征列 : 从数据列中筛选出所有用于特征向量的列。
  2. 预处理数据 : 通过withColumn 方法, 确保每一列都可以通过cast转成double 类型, 并把 null值用 fillna 方法替换成 0
  3. 执行向量化 : 调用VectorAssembler生成特征向量。

安全建议

数据处理的过程中要对原始数据保持保护态度,确保数据来源可靠,并在数据处理过程做详细的错误和日志处理,以便在问题发生的时候快速进行故障排除和溯源。同时,确保所用函数和转换逻辑能够很好地适应输入数据的特点。

本文列举了两个 VectorAssembler 使用问题以及对应的解决方案。当数据源格式多样,并且无法确认的时候,使用数据清洗或者数据转换都是有效且安全的方式。 理解其工作原理可以帮助大家有效地应用该功能。