DF 类似于二维表的数据结果

mame

age

狗山石23
获取df的列名: df.columns

显示当前值 打印 df.show() show(2) show括号里面传入参数可以显示查看几行 show(2,False)  False 是否全部显示 False 不隐藏
获取前10行数据 df.limit(10) 里面传递的一个整形 后面加上show() 可以打印

获取列值key df.select(["key"]) 传入参数写法 df.select([df[x] for x in keys]) 后面加上show() 可以打印

将每一行转化为json 并将行名,命名为wang df.select(to_json(struct([df["key"]])).alias("wang")).show() 

 df.select() 操作

from pyspark.sql.functions import to_json, struct,concat

# 将每一行转化为json 并将行名,命名为wang
df.select(to_json(struct([df["key"]])).alias("wang"))
# 将每一行转化为字符串 并将行名,命名为data

df.select(concat(*df.columns).alias('data'))

# 在窗口调试后面加上 show() 可以打印
 df.select() 操作  # 在窗口调试后面加上 show() 可以打印

df.select(["*"]) # 选择全部数据
df.select(["name"]) # 选择对应列操作

df 的写入操作

df.select(to_json(struct(["key","json"])).alias("value")).write.format("kafka").option("kafka.bootstrap.servers",','.join(["emr2-header-1.ipa.aidigger.com:6667", "emr2-header-2.ipa.
aidigger.com:6667"])).option("topic","text").save()

df.write 写入操作

写入kafka  
to_json(struct(["key","json"])).alias("value")  把df转化为json格式

df.select(to_json(struct(["key","json"])).alias("value")).write.format("kafka").option("kafka.bootstrap.servers",','.join(["ip", "ip
"])).option("topic","主题名字").save()

from pyspark.sql.functions import to_json, struct,concat
df.select(concat(*df.columns).alias('data')).show()


收藏的博客 

PySpark SQL常用语法 df   https://www.jianshu.com/p/177cbcb1cb6f

使用PySpark将kafka数据写入ElasticSearch  https://blog.csdn.net/qq_37050993/article/details/90606527

Pyspark DataFrame读写  https://www.jianshu.com/p/d1f6678db183

pyspark读写操作  https://blog.csdn.net/zyj20200/article/details/81697786#33-%E5%86%99%E5%88%B0hive

pyspark系列--日期函数 https://blog.csdn.net/suzyu12345/article/details/79673569

pyspark系列  https://blog.csdn.net/suzyu12345/category_6653162.html

12-28 03:14