本文共 1632 字,大约阅读时间需要 5 分钟。
学习spark了一段时间需要把各个功能做个串联,实现个简单相对完整的例子,所以尝试使用spark读取 hdfs中的数据,进行处理后,把结果写入mysql数据。
需要用到测试数据(hdfs上的文本数据)请自行下载,或者 到
#! /usr/bin/env python# This Python file uses the following encoding: utf-8'''Created on Nov 27, 2017@author: root'''import pandas as pdfrom pyspark.sql import SparkSessionfrom pyspark import SparkContextfrom pyspark.sql import SQLContextdef map_extract(element): file_path, content = element year = file_path[-8:-4] return [(year, i) for i in content.split("\r\n") if i]spark = SparkSession\ .builder\ .appName("PythonTest")\ .getOrCreate()res = spark.sparkContext.wholeTextFiles('hdfs://localhost:8020/datatnt/names', minPartitions=40) \ .map(map_extract) \ .flatMap(lambda x: x) \ .map(lambda x: (x[0], int(x[1].split(',')[2]))) \ .reduceByKey(lambda x,y:x+y)df = res.toDF(["key","num"]) #把已有数据列改成和目标mysql表的列的名字相同# print(dir(df))df.printSchema()print(df.show())df.printSchema()df.write.format("jdbc").options( url="jdbc:mysql://localhost:3306/leaf", driver="com.mysql.cj.jdbc.Driver", dbtable="spark", user="root", password="xx@123456").mode('append').save()其中dataframe中的schama 相关信息:
|-- key: string (nullable = true)
|-- num: long (nullable = true) +----+-------+ | key| num| +----+-------+ |1922|2289200| |1939|2203241| |1980|3444418| |1923|2302512| |1938|2212260| |1981|3458968| |1924|2381673| |1889| 288947| |1925|2333334| |1888| 299475| |1887| 247396| |1942|2731426| |1926|2295922| |1886| 255317| |1943|2822095| |1927|2319230| |1928|2260773| |1955|4012933| |1885| 240854| |1940|2302361| +----+-------+ only showing top 20 rows转载地址:http://cjfob.baihongyu.com/