问题:
I have the following Dataframe:
#+-----------------------------+--------+--------+---------+
#|PROVINCE_STATE|COUNTRY_REGION|2/1/2020|2/1/2020|2/11/2020|
#+-----------...
可以将文章内容翻译成中文,广告屏蔽插件会导致该功能失效:
问题:
I have the following Dataframe:

#+-----------------------------+--------+--------+---------+
#|PROVINCE_STATE|COUNTRY_REGION|2/1/2020|2/1/2020|2/11/2020|
#+--------------+--------------+--------+--------+---------+
#| -| Australia| 12| 15| 15|
#+--------------+--------------+--------+--------+---------+
I need to merge all rows in one, and for dates to have sum based on COUNTRY_REGION. The thing is that I have many more columns and no idea how to do it dynamically. Tried groupBy, but still don't work. Thanks.
回答1:
If your first two columns are always province and state and other n-columns are dates you can try below (Scala):
import org.apache.spark.sql.functions._
val dateCols = df.columns.drop(2).map(c => sum(c).as(c)) // select all columns except first 2 and perform sum on each of them
df.groupBy('country_region).agg(dateCols.head,dateCols.tail:_*).show()
python version:
import pyspark.sql.functions as f
dateCols = [f.sum(c) for c in df.columns][2:] # select all columns except first 2 and perform sum on each of them
df.groupBy('country_region').agg(*dateCols).show()
output:
+--------------+--------+---------+---------+
|country_region|2/1/2020|2/10/2020|2/11/2020|
+--------------+--------+---------+---------+
| aus| 12| 15| 15|
+--------------+--------+---------+---------+
回答2:
Use aggregation:
select '-' as province_state, country_region,
sum(`2/1/2020`), sum(`2/10/2020`), sum(`2/11/2020`)
from t
group by country_region;
I'm not sure what you mean by "dynamically". As a SQL query, you need to list each expression independently.
回答3:
Try this.
from pyspark.sql import functions as F
from dateutil.parser import parse
def is_date(string, fuzzy=False):
try:
parse(string, fuzzy=fuzzy)
return True
except ValueError:
return False
df.groupBy(F.lit('-').alias("PROVINCE_STATE"),'COUNTRY_REGION')
.agg(*((F.sum(x)).cast('int').alias(x) for x in df.columns if is_date(x)==True)).show()
#+--------------+--------------+--------+---------+---------+
#|PROVINCE_STATE|COUNTRY_REGION|2/1/2020|2/10/2020|2/11/2020|
#+--------------+--------------+--------+---------+---------+
#| -| Australia| 12| 15| 15|
#+--------------+--------------+--------+---------+---------+
回答4:
Try this in pyspark: One way of doing this is using window functions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder
.appName('SO')
.getOrCreate()
sc= spark.sparkContext
df = sc.parallelize([
("new south wales", "aus", 4, 4, 4),("victoria", "aus", 4, 4, 4), ("queensland", "aus", 3, 5, 5), ("south australia","aus", 1, 2, 2)
]).toDF(["province_state", "country_region", "2/1/2020", "2/10/2020", "2/11/2020"])
df.show()
#
# +---------------+--------------+--------+---------+---------+
# | province_state|country_region|2/1/2020|2/10/2020|2/11/2020|
# +---------------+--------------+--------+---------+---------+
# |new south wales| aus| 4| 4| 4|
# | victoria| aus| 4| 4| 4|
# | queensland| aus| 3| 5| 5|
# |south australia| aus| 1| 2| 2|
# +---------------+--------------+--------+---------+---------+
w = Window().partitionBy('country_region')
w1 = Window().partitionBy('country_region').orderBy('country_region')
for column in df.columns:
if column not in ['country_region','province_state']:
df = df.withColumn(column, F.sum(column).over(w) )
df1 = df.withColumn("r_no", F.row_number().over(w1)).where(F.col('r_no')==1)
df1.select(F.lit('_').alias('province_state'), *[ column for column in df1.columns if column not in ['province_state']]).drop(F.col('r_no')).show()
# +--------------+--------------+--------+---------+---------+
# |province_state|country_region|2/1/2020|2/10/2020|2/11/2020|
# +--------------+--------------+--------+---------+---------+
# | _| aus| 12| 15| 15|
# +--------------+--------------+--------+---------+---------+