spark数据清洗练习
- 软件开发
- 2025-08-20 05:48:02

文章目录 准备工作删除缺失值 >= 3 的数据删除星级、评论数、评分中任意字段为空的数据删除非法数据hotel_data.csv 通过编写Spark程序清洗酒店数据里的缺失数据、非法数据、重复数据 准备工作 搭建 hadoop 伪分布或 hadoop 完全分布上传 hotal_data.csv 文件到 hadoopidea 配置好 scala 环境 删除缺失值 >= 3 的数据 读取 /hotel_data.csv删除缺失值 >= 3 的数据, 打印剔除的数量将清洗后的数据保存为/hotelsparktask1 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo01 { def main(args: Array[String]): Unit = { // System.setProperty("HADOOP_USER_NAME", "root")//解决保存文件权限不够的问题 val config: SparkConf = new SparkConf().setMaster("local[1]").setAppName("1") val sc = new SparkContext(config) val hdfsUrl ="hdfs://192.168.226.129:9000" val filePath: String = hdfsUrl+"/file3_1/hotel_data.csv" val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache() val total: Long = data.count() val dataDrop: RDD[Array[String]] = data.filter(_.count(_.equals("NULL")) <= 3) println("删除的数据条目有: " + (total - dataDrop.count())) dataDrop.map(_.mkString(",")).saveAsTextFile(hdfsUrl+ "/hotelsparktask1") sc.stop() } } 删除星级、评论数、评分中任意字段为空的数据 读取 /hotel_data.csv将字段{星级、评论数、评分}中任意字段为空的数据删除, 打印剔除的数量保存 /hotelsparktask2 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo02 { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root") val config: SparkConf = new SparkConf().setMaster("local[1]").setAppName("2") val sc = new SparkContext(config) val hdfsUrl ="hdfs://192.168.226.129:9000" val filePath: String = hdfsUrl+"/file3_1/hotel_data.csv" val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache() val total: Long = data.count() val dataDrop: RDD[Array[String]] = data.filter { arr: Array[String] => !(arr(6).equals("NULL") || arr(10).equals("NULL") || arr(11).equals("NULL")) } println("删除的数据条目有: " + (total - dataDrop.count())) dataDrop.map(_.mkString(",")).saveAsTextFile(hdfsUrl+ "/hotelsparktask2") sc.stop() } } 删除非法数据 读取第一题的 /hotelsparktask1剔除数据集中评分和星级字段的非法数据,合法数据是评分[0,5]的实数,星级是指星级字段内容中包含 NULL、二星、三星、四星、五星的数据剔除数据集中的重复数据分别打印 删除含有非法评分、星级以及重复的数据条目数保存 /hotelsparktask3 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo03 { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root")//解决权限问题 val config: SparkConf = new SparkConf().setMaster( "local[1]").setAppName("3") val sc = new SparkContext(config) val hdfsUrl ="hdfs://192.168.226.129:9000" val filePath: String = hdfsUrl+"/hotelsparktask1" val lines: RDD[String] = sc.textFile(filePath).cache() val data: RDD[Array[String]] = lines.map(_.split(",")) val total: Long = data.count() val dataDrop: RDD[Array[String]] = data.filter { arr: Array[String] => try { (arr(10).toDouble >= 0) && (arr(10).toDouble <= 5) } catch { case _: Exception => false } } val lab = Array("NULL", "一星", "二星", "三星", "四星", "五星") val dataDrop1: RDD[Array[String]] = data.filter { arr: Array[String] => var flag = false for (elem <- lab) { if (arr(6).contains(elem)) { flag = true } } flag } val dataDrop2: RDD[String] = lines.distinct println("删除的非法评分数据条目有: " + (total - dataDrop.count())) println("删除的非法星级数据条目有: " + (total - dataDrop1.count())) println("删除重复数据条目有: " + (total - dataDrop2.count())) val wordsRdd: RDD[Array[String]] = lines.distinct.map(_.split(",")).filter { arr: Array[String] => try { (arr(10).toDouble >= 0) && (arr(10).toDouble <= 5) } catch { case _: Exception => false } }.filter { arr: Array[String] => var flag = false for (elem <- lab) { if (arr(6).contains(elem)) { flag = true } } flag } wordsRdd.map(_.mkString(",")) .saveAsTextFile(hdfsUrl + "/hotelsparktask3") sc.stop() } } hotel_data.csv
下载数据: download.csdn.net/download/weixin_44018458/87437211
spark数据清洗练习由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“spark数据清洗练习”