flink跑通项目
2023-07-13 18:24:39 70浏览
flink跑通项目,连接flink,通过flink传入实时数据,然后对实时数据进行检测处理。
一、完成算法
二、完成封装
将所有功能都封装成函数以备调用
三、提交封装后的代码到 git
如图所示:提交 abnormal_ip.py
四、安装 apache-flink 包,配置 flink 环境
如果出现如图错误,换源进行安装
五、配置好环境后创建 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.create_temporary_system_function('abnormal_ip', anomaly)
# specify connector and format jars
t_env.get_config().set("pipeline.jars",
"file:///D:/Pycharm/projects/workstation/UEBA1/flink-connector-jdbc-3.1.0-1.17.jar;file:///D:/Pycharm/projects/workstation/UEBA1/mysql-connector-java-5.1.49.jar;file:///D:/Pycharm/projects/workstation/UEBA1/flink-sql-connector-kafka-1.17.1.jar")
t_env.get_config().set("sql-client.execution.result-mode", "TABLEAU")
t_env.get_config().set("pipline.parallelism", "1")
六、下载 flink 包,图中的三个 jar 文件
七、和 kafka 建立连接并从 kafka 中接收数据并转换成表
#建立kafka连接,并接受数据
kafka_meta_tree = """
CREATE TABLE kafka_meta_tree (
`before` MAP<STRING, STRING>,
`after` MAP<STRING, STRING>,
`offset` BIGINT METADATA VIRTUAL,
`op` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ueba-login-log',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '192.168.8.65:9092',
'format' = 'json'
);
"""
八、从 kafka 中接收的数据中查询所需要的数据传入函数中
select_sql = """
select
after['id'] as id,
after['ancestors'] as user_id,
after['meta_code'] as login_ip,
after['meta_name'] as login_date
FROM
kafka_meta_tree
LIMIT 50;
"""
t_env.execute_sql(select_sql)
# 执行SQL查询
result_table = t_env.sql_query(select_sql)
# 定义查询语句
result_sql = """
SELECT
user_id,
abnormal_ip(user_id,login_ip,login_date) AS anomaly
FROM
{}
""".format(result_table)
九、在函数中将传入的数据转换成算法封装后的接口所需要的数据结构
@udf(result_type='INT')
def anomaly(user_id,login_ip,login_date):
#创建一个空列表
params = []
# 添加参数到列表中
params.append(user_id)
params.append(login_ip)
params.append(login_date)
#print(params)
# 定义列名
columns = ['user_id', 'login_ip', 'login_date']
# 将列表转换为DataFrame
df = pd.DataFrame([params],columns=columns)
#df = pd.DataFrame({'user_id': pd.Series(params[0]),
#'login_ip': pd.Series(params[1]),
#'login_date': pd.Series(params[2])})
#print(df)
return check(df)
十、算法计算后返回参数
def check(df_log_one):
from tasks import task_check, task_baseline
# 将DataFrame转换为列表
data_list = df_log_one.values.tolist()
result1 = task_check.delay(df_log_one.to_json()) # 异步调用任务1
result2 = task_baseline.delay(df_log_one.to_json()) # 异步调用任务2
print(result2.backend)
return result1
十一、如果检测到异常登陆则需要发送报警信息到钉钉
# 设置你的access_token和要发送的消息内容
your_access_token = "8fc773185b6437b1b138b88207b148adf7a5b76fc08ed414d77c65d39ccd2a9f"
your_message = "登陆异常!"
# 发送消息
send_message_to_dingtalk(your_access_token, your_message)
十二、上线
好博客就要一起分享哦!分享海报
此处可发布评论
评论(0)展开评论
暂无评论,快来写一下吧
展开评论