programing

PySpark DataFrame Aggregate의 열 이름 변경

powerit 2023. 10. 25. 23:49
반응형

PySpark DataFrame Aggregate의 열 이름 변경

저는 PySpark DataFrame으로 몇 가지 데이터를 분석하고 있습니다.데이터 프레임이 있다고 가정합니다.df내가 집계하고 있는 것:

(df.groupBy("group")
   .agg({"money":"sum"})
   .show(100)
)

이것은 나에게 다음과 같은 것을 가져다 줄 것입니다.

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646

집계는 잘 되지만 새 컬럼 이름은 마음에 들지 않습니다.SUM(money#2L). 이 칼럼을 사람이 읽을 수 있는 것으로 이름을 바꿀 방법이 있습니까?.agg방법?어쩌면 사람이 하는 것과 더 비슷한 것이dplyr:

df %>% group_by(group) %>% summarise(sum_money = sum(money))

나는 여전히 dplyr 구문을 선호하지만, 이 코드 조각은 다음을 수행합니다.

import pyspark.sql.functions as sf

(df.groupBy("group")
   .agg(sf.sum('money').alias('money'))
   .show(100))

장황해집니다.

withColumnRenamed속임수를 써야 합니다.pyspark.sql API 링크입니다.

df.groupBy("group")\
  .agg({"money":"sum"})\
  .withColumnRenamed("SUM(money)", "money")
  .show(100)

저는 이것을 위해 몇몇 사람들을 도울 수 있는 작은 도우미 기능을 만들었습니다.

import re

from functools import partial

def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe
    and the number of aggregation columns to ignore.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df

예:

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
 .groupby("id")
 .agg({"rank": "mean",
       "*": "count",
       "rate": "mean", 
       "price": "mean", 
       "clicks": "mean", 
       })
)

>>> gb.columns
['id',
 'avg(rate)',
 'count(1)',
 'avg(price)',
 'avg(rank)',
 'avg(clicks)']

>>> rename_cols(gb).columns
['id',
 'avg_rate',
 'count_1',
 'avg_price',
 'avg_rank',
 'avg_clicks']

사람들이 타자를 너무 많이 치는 것을 막기 위해 최소한의 노력을 하는 것.

간단합니다.

 val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()

사용하다.asagg에 새 행 이름을 지정합니다.

.alias그리고..withColumnRenamed칼럼 이름을 하드코드화할 의사가 있다면 둘 다 가능합니다나머지 모든 열의 집합에 대해 보다 친근한 이름과 같은 프로그래밍 솔루션이 필요한 경우 이를 통해 좋은 출발점을 제공할 수 있습니다.

grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
    df
    .groupBy(grouping_column)
    .agg(
        *cols
    )
)
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
    start_index = column.find('(')
    end_index = column.find(')')
    if (start_index and end_index):
        df = df.withColumnRenamed(column, column[start_index+1:end_index])

위의 코드는 "()" 밖에 있는 모든 것을 제거할 수 있습니다.예를 들어 "sum(foo)"은 "foo"로 이름이 바뀝니다.

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']

df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
|  1| siva|    100|
|  2|siva2|    200|
|  3|siva3|    300|
|  4|siva4|    400|
|  5|siva5|    500|
+---+-----+-------+


**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+

이전에 제시된 답변은 좋지만, 사전 사용에 대한 깔끔한 대처 방법이 부족하다고 생각합니다..agg()

수백 개의 열이 있기 때문에 실제로 동적으로 생성될 수도 있는 dict를 사용하려면 수십 개의 코드 라인을 처리하지 않고 다음을 사용할 수 있습니다.

# Your dictionary-version of using the .agg()-function
# Note: The provided logic could actually also be applied to a non-dictionary approach
df = df.groupBy("group")\
   .agg({
          "money":"sum"
        , "...":  "..."
    })

# Now do the renaming
newColumnNames = ["group", "money", "..."] # Provide the names for ALL columns of the new df
df = df.toDF(*newColumnNames)              # Do the renaming

물론입니다.newColumnNames-list를 동적으로 생성할 수도 있습니다.예를 들어, 집합의 열만 추가하는 경우df미리 저장 가능합니다.newColumnNames = df.columns이름만 추가하면 됩니다.
여하튼, 당신이 그들을 위해newColumnNames이름을 바꿀 열 이름뿐만 아니라 데이터 프레임의 모든 열 이름을 포함해야 합니다(왜냐하면)..toDF()Sparks 불변 RDD)로 인해 새 데이터 프레임이 생성됩니다.

믹스를 추가할 수 있는 또 하나의 빠른 작은 라이너:

df.groupBy('group')
  .agg({'money':'sum',
        'moreMoney':'sum',
        'evenMoreMoney':'sum'
        })
    .select(*(col(i).alias(i.replace("(",'_').replace(')','')) for i in df.columns))

이름을 원하는 대로 가명 기능을 변경해 주세요.위는 sum_money, sum_moreMoney를 생성합니다. 왜냐하면 저는 변수 이름에서 연산자를 보는 것을 좋아하기 때문입니다.

언급URL : https://stackoverflow.com/questions/29988287/renaming-columns-for-pyspark-dataframe-aggregates

반응형