Here is a standard approach.
Point to note: you need to be working with an RDD. I think that is the bottleneck.
Here you go:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C","bar=D", "bar=D")
val sample=keysWithValuesList.map(_.split("=")).map(p=>(p(0),(p(1))))
val sample2 = sc.parallelize(sample.map(x => (x._1, 1)))
val sample3 = sample2.reduceByKey(_+_)
sample3.collect()
val sample4 = sc.parallelize(sample.map(x => (x._1, x._2))).groupByKey()
sample4.collect()
val sample5 = sample4.map(x => (x._1, x._2.toSet))
sample5.collect()
0
solved How to get the specified output without combineByKey and aggregateByKey in spark RDD