培养大数据应用开发工程师,学生综合运用所学软件系统开发的基础理论和专业知识,掌握软件系统开发的设计与实现的流程、方法与技术,并具备解决软件系统开发过程中出现的一般问题的能力;同时培养学生务实、细致、严格、认真和吃苦耐劳的工作作风。
进一步提高学生所学的软件系统开发的理论知识水平,训练学生软件开发的动手能力,掌握软件系统开发过程中需求分析、概要设计、详细设计、功能测试等流程所需技能;贴近工作实际,让学生在对某个项目进行需求分析、功能确认后,完成整个系统架构的搭建以及功能实现,能独立完成开发工具的选择、功能模块的划分、数据库的设计以及软件功能的测试等任务,达到提高软件系统的设计与实现能力的目的。
通过毕业设计,使学生对软件系统的设计和实现的全过程有比较全面的了解,熟悉软件开发的有关流程、软件平台和开发工具的特点,为今后独立工作打下基础。
核心算法代码分享如下:
#Flink连接HDFS上面的CSV文件 使用Flink_SQL分析完入表
## 启动hadoop
## cd /data/hadoop/sbin
## sh /data/hadoop/sbin/start-all.sh
## 启动hive
## cd /data/hive
## nohup hive --service metastore &
## nohup hive --service hiveserver2 &
import os
from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
jars = []
for file in os.listdir(os.path.abspath(os.path.dirname(__file__))):
if file.endswith('.jar'):
file_path = os.path.abspath(file)
jars.append(file_path)
str_jars = ';'.join(['file:///' + jar for jar in jars])
table_env.get_config().get_configuration().set_string("pipeline.jars", str_jars)
# table_env.get_config().get_configuration().set_float('taskmanager.memory.network.fraction',0.8)
# table_env.get_config().get_configuration().set_string('taskmanager.memory.network.min','8gb')
# table_env.get_config().get_configuration().set_string('taskmanager.memory.network.max','16gb')
table_env.get_config().get_configuration().set_string('parallelism.default','1')
#先读取hadoop_hdfs上的CSV文件
table_env.execute_sql(
"""
CREATE TABLE `ods_flight` (
`start_city` string COMMENT '出发城市',
`end_city` string COMMENT '到达城市',
`stime` string COMMENT '出发日期',
`airline_name` string COMMENT '航班名称',
`flight_info` string COMMENT '飞机详细信息',
`flight_type1` string COMMENT '飞机型号',
`flight_type2` string COMMENT '飞机系列',
`setup_time` string COMMENT '出发时间',
`arr_time` string COMMENT '到达时间',
`start_airport` string COMMENT '起飞机场和航站楼',
`arr_airport` string COMMENT '到达机场和航站楼',
`ontime_rate` bigint COMMENT '准点率',
`flight_total_time` string COMMENT '飞行时间字符串',
`price` bigint COMMENT '价格',
`price_desc` string COMMENT '优惠折扣力度',
`flight_company` string COMMENT '航空公司',
`flight_type3` string COMMENT '飞行性质',
`setup_time_math` double COMMENT '出发时间_数字辅助',
`arr_time_math` double COMMENT '到达时间_数字辅助',
`arr_time2` string COMMENT '第几天到达',
`start_airport_simple` string COMMENT '起飞机场',
`arr_airport_simple` string COMMENT '到达机场',
`flight_total_time_math` bigint COMMENT '飞行时长_数字辅助',
`price_desc_math` double COMMENT '优惠折扣力度_数字辅助'
) WITH(
'connector' = 'filesystem',
'path' ='hdfs://bigdata:9000/flink_fliggy_flight/flight/hdfs_flights.csv',
'format' = 'csv'
)
"""
)
#设置下沉到mysql的表
table_env.execute_sql(
"""
create table tables01(
`airline_name` string primary key ,
`price_desc_math` double ,
`price_desc` string ,
`stime` string
) WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://bigdata:3306/Flink_Fliggy_Flight',
'table-name' = 'tables01',
'username' = 'root',
'password' = '123456',
'driver' = 'com.mysql.jdbc.Driver'
)
"""
)
#数据分析并且导入
#result=table_env.sql_query("select * from ods_zymk limit 10 ")
table_env.execute_sql("""
insert into tables01
select distinct airline_name,price_desc_math,price_desc,stime
from ods_flight
order by stime desc,price_desc_math asc
limit 30
""").wait()
#print("表结构",result.get_schema())
#print("数据检查",result.to_pandas())