flink跑通项目

奋斗吧
奋斗吧
擅长邻域:未填写

标签: flink跑通项目 博客 51CTO博客

2023-07-13 18:24:39 70浏览

flink跑通项目,连接flink,通过flink传入实时数据,然后对实时数据进行检测处理。

一、完成算法

二、完成封装

     将所有功能都封装成函数以备调用

三、提交封装后的代码到 git

     如图所示:提交 abnormal_ip.py

     

flink跑通项目_sql

四、安装 apache-flink 包,配置 flink 环境

环境安装 | Apache Flink

如果出现如图错误,换源进行安装

flink跑通项目_kafka_02


五、配置好环境后创建 TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
    t_env = TableEnvironment.create(env_settings)
    t_env.create_temporary_system_function('abnormal_ip', anomaly)
    # specify connector and format jars
    t_env.get_config().set("pipeline.jars",
                           "file:///D:/Pycharm/projects/workstation/UEBA1/flink-connector-jdbc-3.1.0-1.17.jar;file:///D:/Pycharm/projects/workstation/UEBA1/mysql-connector-java-5.1.49.jar;file:///D:/Pycharm/projects/workstation/UEBA1/flink-sql-connector-kafka-1.17.1.jar")
    t_env.get_config().set("sql-client.execution.result-mode", "TABLEAU")
    t_env.get_config().set("pipline.parallelism", "1")

六、下载 flink 包,图中的三个 jar 文件


flink跑通项目_jar_03


七、和 kafka 建立连接并从 kafka 中接收数据并转换成表

#建立kafka连接,并接受数据
kafka_meta_tree = """
       CREATE TABLE kafka_meta_tree (
         `before` MAP<STRING, STRING>,
         `after`  MAP<STRING, STRING>,
         `offset` BIGINT METADATA VIRTUAL,
         `op` STRING,
         `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
          WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
       ) WITH (
           'connector' = 'kafka',
           'topic' = 'ueba-login-log',
           'scan.startup.mode' = 'earliest-offset',
           'properties.bootstrap.servers' = '192.168.8.65:9092',
           'format' = 'json'
       );
       """

八、从 kafka 中接收的数据中查询所需要的数据传入函数中

select_sql = """
       select
       after['id'] as id,
       after['ancestors'] as user_id,
       after['meta_code'] as login_ip,
       after['meta_name'] as login_date
       FROM 
            kafka_meta_tree
       
       LIMIT 50;            
       
       """

    t_env.execute_sql(select_sql)
    # 执行SQL查询
    result_table = t_env.sql_query(select_sql)

    # 定义查询语句
    result_sql = """
        SELECT 
            user_id,
            abnormal_ip(user_id,login_ip,login_date) AS anomaly
        FROM    
            {}
    """.format(result_table)

九、在函数中将传入的数据转换成算法封装后的接口所需要的数据结构

@udf(result_type='INT')
    def anomaly(user_id,login_ip,login_date):
        #创建一个空列表
        params = []

        # 添加参数到列表中
        params.append(user_id)
        params.append(login_ip)
        params.append(login_date)
        #print(params)
        # 定义列名
        columns = ['user_id', 'login_ip', 'login_date']
        # 将列表转换为DataFrame
        df = pd.DataFrame([params],columns=columns)
        #df = pd.DataFrame({'user_id': pd.Series(params[0]),
                           #'login_ip': pd.Series(params[1]),
                           #'login_date': pd.Series(params[2])})
        #print(df)
        return check(df)

十、算法计算后返回参数

def check(df_log_one):
    from tasks import task_check, task_baseline

    # 将DataFrame转换为列表
    data_list = df_log_one.values.tolist()
    result1 = task_check.delay(df_log_one.to_json())  # 异步调用任务1
    result2 = task_baseline.delay(df_log_one.to_json())  # 异步调用任务2
    print(result2.backend)
    return result1

十一、如果检测到异常登陆则需要发送报警信息到钉钉

# 设置你的access_token和要发送的消息内容
 your_access_token = "8fc773185b6437b1b138b88207b148adf7a5b76fc08ed414d77c65d39ccd2a9f"
your_message = "登陆异常!"

# 发送消息
send_message_to_dingtalk(your_access_token, your_message)

十二、上线


好博客就要一起分享哦!分享海报

此处可发布评论

评论(0展开评论

暂无评论,快来写一下吧

展开评论

您可能感兴趣的博客

客服QQ 1913284695