博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark-python从hdfs文本数据(美国历年出生人数)统计然后把结果数据写入mysql
阅读量:2396 次
发布时间:2019-05-10

本文共 1632 字,大约阅读时间需要 5 分钟。

学习spark了一段时间需要把各个功能做个串联,实现个简单相对完整的例子,所以尝试使用spark读取 hdfs中的数据,进行处理后,把结果写入mysql数据。

需要用到测试数据(hdfs上的文本数据)请自行下载,或者 到

#! /usr/bin/env python# This Python file uses the following encoding: utf-8'''Created on Nov 27, 2017@author: root'''import pandas as pdfrom pyspark.sql import SparkSessionfrom pyspark import SparkContextfrom pyspark.sql import SQLContextdef map_extract(element):    file_path, content = element    year = file_path[-8:-4]    return [(year, i) for i in content.split("\r\n") if i]spark = SparkSession\    .builder\    .appName("PythonTest")\    .getOrCreate()res = spark.sparkContext.wholeTextFiles('hdfs://localhost:8020/datatnt/names',                        minPartitions=40)  \        .map(map_extract) \        .flatMap(lambda x: x) \        .map(lambda x: (x[0], int(x[1].split(',')[2]))) \        .reduceByKey(lambda x,y:x+y)df = res.toDF(["key","num"])  #把已有数据列改成和目标mysql表的列的名字相同# print(dir(df))df.printSchema()print(df.show())df.printSchema()df.write.format("jdbc").options(    url="jdbc:mysql://localhost:3306/leaf",    driver="com.mysql.cj.jdbc.Driver",    dbtable="spark",    user="root",    password="xx@123456").mode('append').save()
其中dataframe中的schama 相关信息:

 |-- key: string (nullable = true)

 |-- num: long (nullable = true)
+----+-------+
| key|    num|
+----+-------+
|1922|2289200|
|1939|2203241|
|1980|3444418|
|1923|2302512|
|1938|2212260|
|1981|3458968|
|1924|2381673|
|1889| 288947|
|1925|2333334|
|1888| 299475|
|1887| 247396|
|1942|2731426|
|1926|2295922|
|1886| 255317|
|1943|2822095|
|1927|2319230|
|1928|2260773|
|1955|4012933|
|1885| 240854|
|1940|2302361|
+----+-------+
only showing top 20 rows

转载地址:http://cjfob.baihongyu.com/

你可能感兴趣的文章
Active Data Guard初探(一)
查看>>
MySQL和Oracle中的半连接测试总结(一)
查看>>
数据库无响应问题的紧急处理和分析
查看>>
GoldenGate复制的几个简单测试
查看>>
GoldenGate数据迁移的问题总结(一)
查看>>
GoldenGate简单复制环境的搭建
查看>>
使用SQL来分析数据库参数(二)
查看>>
《阿尔卑斯》观后感
查看>>
《共同警戒区》观后感(r10笔记第97天)
查看>>
深入理解Oracle中的DBCA
查看>>
12C打补丁的简单尝试
查看>>
近期的计划和安排
查看>>
Java随机算法(一)(r11笔记第14天)
查看>>
insert导致的性能问题大排查(r11笔记第26天)
查看>>
MySQL修复表的简单分析
查看>>
一些对我影响重大的流行歌曲(r11笔记第32天)
查看>>
MySQL中的半同步复制(r11笔记第65天)
查看>>
MySQL误操作数据恢复的简单实践(r11笔记第67天)
查看>>
值得推荐的几部日本电影(三)(r10笔记第29天)
查看>>
MySQL中insert语句没有响应的问题分析(r11笔记第21天)
查看>>