from pyspark import SparkContext
from pyspark.sql import SQLContext if __name__=="__main__":
sc = SparkContext(appName="local")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test?user=root&password=root",dbtable="test_customer").load()
df.show()
sc.stop()
如果报错 no suitable driver
需要把连接mysql的jdbc的jar包拷到spark文件夹里的jars文件夹里。
附:使用SQL查询的代码
from pyspark import SparkContext
from pyspark.sql import SQLContext if __name__=="__main__":
sc = SparkContext(appName="local")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test?user=root&password=root",dbtable="test_customer").load()
df.registerTempTable("test1");
ls = sqlContext.sql("select * from test1 where did = 1").collect()
for it in ls:
print("")
sc.stop()
再附,HiveContext的使用及RDD转DataFrame:
from pyspark import SparkContext
from pyspark.sql import HiveContext,SQLContext,Row if __name__=="__main__":
sc = SparkContext(appName="local")
hc = HiveContext(sc) #HiveContext
sqlContext = SQLContext(sc) #SqlContext
datas = ["1 a 28","2 b 29", "3 c 30"]
source = sc.parallelize(datas) #加载数组
splits = source.map(lambda line: line.split(" ")) #map方法返回的RDD格式的数据
rows = splits.map(lambda words: Row(id = words[0], name = words[1], age = words[2]))
structType = hc._inferSchema(rows); #获得StructType
people = sqlContext.createDataFrame(rows, structType) #通过StructType和查询出来的数据转换成DataFrame
people.registerTempTable("people") #注册表
results = hc.sql("select name from people").collect()
#results1 = results.map(lambda row: row.name.upper()).collect()
for result in results:
print("name:"+result.name)
sc.stop()