-2

I've a dataset of the form :

enter image description here

Now for each combination of id1 and id2, so suppose id1 = 1 and id2 = 2, for each date value, i want to pick value from rows that lie within 1 week prior and 1 week post of the date in the current row, but in previous year.

So for example, id1 = 1, id2 = 2, date = 2023-06-01 i want to fetch value column values from rows with id1 = 1, id2 = 2 and date between 2022-05-24 and 2022-06-10, get the values from the value column and explode them into new columns.

So the data will finally look like :

enter image description here

If all days are not present in this range, the value will be filled later on by 0, but i need the data in sorted order, so if any middle day is missing, that should reflect in the resulting column.

How can i do this in pyspark or pandas even. I thought of using window and joins, but couldn't figure this out.

Any help is greatly appreciated.

EDIT :

An Example :

id1 id2 date    value
0   1   2021-12-28  24
0   1   2021-12-30  24
0   1   2022-01-04  24
0   1   2022-01-06  24
0   1   2022-01-07  8
0   1   2022-01-11  16
0   1   2022-01-13  16
0   1   2023-01-03  16
0   1   2023-01-05  56

The target dataset should look like :

id1 id2 date    value   day-7   day-6   day-5   day-4   day-3   day-2   day-1   day-0   day+1   day+2   day+3   day+4   day+5   day+6   day+7
0   1   2021-12-28  24  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
0   1   2021-12-30  24  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
0   1   2022-01-04  24  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
0   1   2022-01-06  24  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
0   1   2022-01-07  8   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
0   1   2022-01-11  16  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
0   1   2022-01-13  16  0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
0   1   2023-01-03  16  0   24  0   24  0   0   0   0   24  0   24  8   0   0   0
0   1   2023-01-05  56  0   24  0   0   0   0   24  0   24  8   0   0   0   16  0
5
  • Images of data are not acceptable on SO, you must provide a minimal reproducible example
    – mozway
    Commented Jul 10 at 9:58
  • I think when you say this "So for example, id1 = 1, id2 = 2, date = 2023-06-01 ..", you meant id2=1, right? Please clarify this when you provide minimal reproducible example Commented Jul 10 at 10:21
  • Sorry for the bad formatting, but i've added a sample data
    – abtExp
    Commented Jul 10 at 10:40
  • @PawanTolani id1 and id2 can be anything, they're two different identifiers, one combination represents one entity, so id1 = 1 and id2 = 2 will represent one entity and id1 = 1 and id2 = 1 will represent a different entity.
    – abtExp
    Commented Jul 10 at 10:42
  • @mozway I've added the sample data
    – abtExp
    Commented Jul 10 at 10:42

1 Answer 1

0

Here is my solution using pyspark.

ab=spark.read.csv("dbfs:/FileStore/test_only/check/spark_4-1.csv",header=True) # This is your input data. 

ab=ab.withColumn("date",to_date(col("date"),"dd-MM-yyyy"))

ab=ab.withColumn("array",array(date_sub(col("date"),372) , date_sub(col("date"),371) , date_sub(col("date"),370)\
                               ,date_sub(col("date"),369),date_sub(col("date"),368),date_sub(col("date"),367)\
                                   , date_sub(col("date"),366)\
                                       ,date_sub(col("date"),364),date_sub(col("date"),363),date_sub(col("date"),362)\
                                           ,date_sub(col("date"),361),date_sub(col("date"),360),date_sub(col("date"),359)\
                                               ,date_sub(col("date"),358)))

ab=ab.filter(col("date").isNotNull())
bc=ab.withColumn("new",explode(col("array")))
new_list=['ab_id1',"ab_id2","ab_date","ab_value","array"]
ab=ab.toDF(*new_list) # renaming columns to differentiate between the base and intermediate datasets
cd=bc.join(ab,((ab['ab_id1']==bc["id1"] )& (ab['ab_id2']==bc['id2']) & (ab['ab_date']==bc['new'])),how='right') #right join on base and intermediate datasets
cd=cd.select("id1","id2","date","value","ab_date","ab_value")
ef=cd.groupby("id1","id2","date","value").pivot("ab_date").agg(first(col("ab_value")))
ef.display() #Pivot and display of the output

Not the answer you're looking for? Browse other questions tagged or ask your own question.