내가 보려고 만든 블로그

Pyflink 간단한 사용법 본문

Data Engineering/Streaming

Pyflink 간단한 사용법

정의김 2023. 3. 24. 11:23

요새 kafka, flink, spark 등등 자바로 만들어진 어플리케이션들도 python을 워낙 많이 지원하고 있어서 편리하다. 

Flink도 pyflink 라고 파이썬에서 flink 개발을 할 수 있도록 도와주는 라이브러리가 있어서 이를 간단히 정리하고자 한다.

사실 이 글은 지금 패스트캠퍼스에 있는 강의를 참고해서 올리는데 코드의 세부 로직보다는 이런식으로 사용을 하는구나 정도를 정리함.

flink를 사용하는 방법이 여러가지가 있지만 쿼리형태로 source table , sink table 지정해서 사용하는게 제일 편리한 것 같아 이방법으로 정리함.

 

 flink에서 가장 많이 사용되는 형태인 Kafka 를 통해 메세지를 받고 이를 가공해서 file이나 db등으로 보내는 것 을 예제로 담음.

( kafka 메세지를 가공하고 hdfs에 csv를 적재하는 예제임 )

 

{text: "hello world", timestamp: 13443252350 } , {text: "hi", timestamp: 1323252350 }

와 같이 Json 형태의 데이터가 producing되면 이를 consuming한후 가공하여 hdfs에 csv로 적재하는 예제.

import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf 

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)

kafka_jar_path = os.path.join(
  os.path.abspath(os.path.dirname(__file__)), "../",
  "flink-sql-connector-kafka_2.11-1.14.0.jar"
)

hadoop_jar_path = os.path.join(
    os.path.abspath(os.path.dirname(__file__)), "../",
    "flink-filesystem-hadoop-2.8.3-1.14.0.jar"
)
jar_path = kafka_jar_path + ';' + hadoop_jar_path

t_env.get_config().get_configuration().set_string(
  "pipeline.jars", jar_path
)

source_query = """
  CREATE TABLE tweets (
    text STRING,
    timestamp_ms BIGINT,
    ts AS TO_TIMESTAMP_LTZ(timestamp_ms, 3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
  ) WITH (
    'connector' = 'kafka',
    'topic' = 'korean-tweets',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'tweet-group',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
  )
"""

sink_query = """
  CREATE TABLE sink (
    word STRING,
    count BIGINT
  ) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs://localhost:9000/output',
    'format' = 'csv'
  )
"""

t_env.execute_sql(source_query)
t_env.execute_sql(sink_query)

t_env.sql_query("""
  SELECT word, COUNT(*)
  FROM (
    SELECT explode(split(text, ' ')) AS word
    FROM tweets
  )
  GROUP BY word
""").execute_insert("sink")

env.execute("flink-kafka-hdfs")

1. 가장 먼저 spark , kafka 를 사용할 때 처럼 ,  만들어줄 flink application과 관련한 env, setting 정의해주면 된다 . 

- set_parralelism 같은 경우 flink application이 데이터를 병렬처리할 수준을 정해줌 (쓰레드)

- streaming_mode와 함께 blink planner라는 것이 있는데 쿼리 옵티마이저를 선택해주는 부분이다. blink planner가 옵티마이징이 뛰어나다고 한다.

2.  하둡 , 카프카를 사용하는데 필요한 jar파일 path 지정을 해주고

3. source_query 와 sink query 부분을 각각 작성해줌.

(source =input으로 들어오는 데이터 -  kafka , sink = output으로 나가는 데이터- hdfs 를 정의)

source_query를 tweet이라는 테이블명 , sink_query를 sink 라는 테이블로 이름 지어줌 .

(스파크에서 createOrTempView 생각하면 됨 ). 

4. t_env.execute_sql 은 실제로 실행해주는 부분은 아님 . 스파크의 lazy evaluation 생각하면됨 . 

5. t_env.sql_query().execute_insert("sink") -> source_table tweet을 어떻게 가공해서 sink 테이블로 만들어줄지 부분을 정의하고 적재 할수 있도록 함 . 

6. env.exeute -> 위에서 정의한 job을 "flink-kafka_hdfs"이름의 application으로 flink 클러스터에 제출함.