Puteți utiliza when
pentru a realiza acest lucru.
Variabila possible_values
în interiorul derive_column_A()
controlează toate posibilele valori de coloane. De asta am dinamic construi o condiție pentru a alege corespunzătoare coloanei.
Exemplu
from pyspark.sql import functions as F
data = [("B1", "C1", "D1", "column C"),
("B2", "C2", "D2", "column D"),
("B3", "C3", "D3", "column B"),
("B4", "C4", "D4", "column D")]
df = spark.createDataFrame(data, ("B", "C", "D", "X"))
def derive_column_A():
possible_values = ["column B", "column C", "column D"]
column_mapping = [{col, col.split(" ")[1]} for col in possible_values]
condition = F
for possible_value in possible_values:
condition = condition.when(F.col("X") == possible_value, F.col(possible_value.split(" ")[1]))
return condition
df.withColumn("A", derive_column_A()).show()
Ieșire
+---+---+---+--------+---+
| B| C| D| X| A|
+---+---+---+--------+---+
| B1| C1| D1|column C| C1|
| B2| C2| D2|column D| D2|
| B3| C3| D3|column B| B3|
| B4| C4| D4|column D| D4|
+---+---+---+--------+---+