PySpark DataFrame Aggregate의 열 이름 변경
저는 PySpark DataFrame으로 몇 가지 데이터를 분석하고 있습니다.데이터 프레임이 있다고 가정합니다.df
내가 집계하고 있는 것:
(df.groupBy("group")
.agg({"money":"sum"})
.show(100)
)
이것은 나에게 다음과 같은 것을 가져다 줄 것입니다.
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
집계는 잘 되지만 새 컬럼 이름은 마음에 들지 않습니다.SUM(money#2L)
. 이 칼럼을 사람이 읽을 수 있는 것으로 이름을 바꿀 방법이 있습니까?.agg
방법?어쩌면 사람이 하는 것과 더 비슷한 것이dplyr
:
df %>% group_by(group) %>% summarise(sum_money = sum(money))
나는 여전히 dplyr 구문을 선호하지만, 이 코드 조각은 다음을 수행합니다.
import pyspark.sql.functions as sf
(df.groupBy("group")
.agg(sf.sum('money').alias('money'))
.show(100))
장황해집니다.
withColumnRenamed
속임수를 써야 합니다.pyspark.sql API 링크입니다.
df.groupBy("group")\
.agg({"money":"sum"})\
.withColumnRenamed("SUM(money)", "money")
.show(100)
저는 이것을 위해 몇몇 사람들을 도울 수 있는 작은 도우미 기능을 만들었습니다.
import re
from functools import partial
def rename_cols(agg_df, ignore_first_n=1):
"""changes the default spark aggregate names `avg(colname)`
to something a bit more useful. Pass an aggregated dataframe
and the number of aggregation columns to ignore.
"""
delimiters = "(", ")"
split_pattern = '|'.join(map(re.escape, delimiters))
splitter = partial(re.split, split_pattern)
split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
renamed = map(split_agg, agg_df.columns[ignore_first_n:])
renamed = zip(agg_df.columns[ignore_first_n:], renamed)
for old, new in renamed:
agg_df = agg_df.withColumnRenamed(old, new)
return agg_df
예:
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
.groupby("id")
.agg({"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean",
})
)
>>> gb.columns
['id',
'avg(rate)',
'count(1)',
'avg(price)',
'avg(rank)',
'avg(clicks)']
>>> rename_cols(gb).columns
['id',
'avg_rate',
'count_1',
'avg_price',
'avg_rank',
'avg_clicks']
사람들이 타자를 너무 많이 치는 것을 막기 위해 최소한의 노력을 하는 것.
간단합니다.
val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()
사용하다.as
agg에 새 행 이름을 지정합니다.
.alias
그리고..withColumnRenamed
칼럼 이름을 하드코드화할 의사가 있다면 둘 다 가능합니다나머지 모든 열의 집합에 대해 보다 친근한 이름과 같은 프로그래밍 솔루션이 필요한 경우 이를 통해 좋은 출발점을 제공할 수 있습니다.
grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
df
.groupBy(grouping_column)
.agg(
*cols
)
)
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
start_index = column.find('(')
end_index = column.find(')')
if (start_index and end_index):
df = df.withColumnRenamed(column, column[start_index+1:end_index])
위의 코드는 "()" 밖에 있는 모든 것을 제거할 수 있습니다.예를 들어 "sum(foo)"은 "foo"로 이름이 바뀝니다.
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']
df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
| 1| siva| 100|
| 2|siva2| 200|
| 3|siva3| 300|
| 4|siva4| 400|
| 5|siva5| 500|
+---+-----+-------+
**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+
이전에 제시된 답변은 좋지만, 사전 사용에 대한 깔끔한 대처 방법이 부족하다고 생각합니다..agg()
수백 개의 열이 있기 때문에 실제로 동적으로 생성될 수도 있는 dict를 사용하려면 수십 개의 코드 라인을 처리하지 않고 다음을 사용할 수 있습니다.
# Your dictionary-version of using the .agg()-function
# Note: The provided logic could actually also be applied to a non-dictionary approach
df = df.groupBy("group")\
.agg({
"money":"sum"
, "...": "..."
})
# Now do the renaming
newColumnNames = ["group", "money", "..."] # Provide the names for ALL columns of the new df
df = df.toDF(*newColumnNames) # Do the renaming
물론입니다.newColumnNames
-list를 동적으로 생성할 수도 있습니다.예를 들어, 집합의 열만 추가하는 경우df
미리 저장 가능합니다.newColumnNames = df.columns
이름만 추가하면 됩니다.
여하튼, 당신이 그들을 위해newColumnNames
이름을 바꿀 열 이름뿐만 아니라 데이터 프레임의 모든 열 이름을 포함해야 합니다(왜냐하면)..toDF()
Sparks 불변 RDD)로 인해 새 데이터 프레임이 생성됩니다.
믹스를 추가할 수 있는 또 하나의 빠른 작은 라이너:
df.groupBy('group')
.agg({'money':'sum',
'moreMoney':'sum',
'evenMoreMoney':'sum'
})
.select(*(col(i).alias(i.replace("(",'_').replace(')','')) for i in df.columns))
이름을 원하는 대로 가명 기능을 변경해 주세요.위는 sum_money, sum_moreMoney를 생성합니다. 왜냐하면 저는 변수 이름에서 연산자를 보는 것을 좋아하기 때문입니다.
언급URL : https://stackoverflow.com/questions/29988287/renaming-columns-for-pyspark-dataframe-aggregates
'programing' 카테고리의 다른 글
Angular 6+에서 로컬 컴퓨터에서 Dist 폴더를 실행하는 방법? (0) | 2023.10.25 |
---|---|
Larlabel로 반환되는 외부 URL로 리디렉션 (0) | 2023.10.25 |
레일 앱을 Oracle에 연결할 수 없음 (0) | 2023.10.25 |
노드 4에서 ES6 클래스를 제대로 내보내는 방법은? (0) | 2023.10.25 |
리소스 정의의 매개 변수 이름에 "at" 기호를 지정합니다. (0) | 2023.10.25 |