내가 보려고 만든 블로그
Pyflink 간단한 사용법 본문
요새 kafka, flink, spark 등등 자바로 만들어진 어플리케이션들도 python을 워낙 많이 지원하고 있어서 편리하다.
Flink도 pyflink 라고 파이썬에서 flink 개발을 할 수 있도록 도와주는 라이브러리가 있어서 이를 간단히 정리하고자 한다.
사실 이 글은 지금 패스트캠퍼스에 있는 강의를 참고해서 올리는데 코드의 세부 로직보다는 이런식으로 사용을 하는구나 정도를 정리함.
flink를 사용하는 방법이 여러가지가 있지만 쿼리형태로 source table , sink table 지정해서 사용하는게 제일 편리한 것 같아 이방법으로 정리함.
flink에서 가장 많이 사용되는 형태인 Kafka 를 통해 메세지를 받고 이를 가공해서 file이나 db등으로 보내는 것 을 예제로 담음.
( kafka 메세지를 가공하고 hdfs에 csv를 적재하는 예제임 )
{text: "hello world", timestamp: 13443252350 } , {text: "hi", timestamp: 1323252350 }
와 같이 Json 형태의 데이터가 producing되면 이를 consuming한후 가공하여 hdfs에 csv로 적재하는 예제.
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
kafka_jar_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "../",
"flink-sql-connector-kafka_2.11-1.14.0.jar"
)
hadoop_jar_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "../",
"flink-filesystem-hadoop-2.8.3-1.14.0.jar"
)
jar_path = kafka_jar_path + ';' + hadoop_jar_path
t_env.get_config().get_configuration().set_string(
"pipeline.jars", jar_path
)
source_query = """
CREATE TABLE tweets (
text STRING,
timestamp_ms BIGINT,
ts AS TO_TIMESTAMP_LTZ(timestamp_ms, 3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'korean-tweets',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'tweet-group',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
"""
sink_query = """
CREATE TABLE sink (
word STRING,
count BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://localhost:9000/output',
'format' = 'csv'
)
"""
t_env.execute_sql(source_query)
t_env.execute_sql(sink_query)
t_env.sql_query("""
SELECT word, COUNT(*)
FROM (
SELECT explode(split(text, ' ')) AS word
FROM tweets
)
GROUP BY word
""").execute_insert("sink")
env.execute("flink-kafka-hdfs")
1. 가장 먼저 spark , kafka 를 사용할 때 처럼 , 만들어줄 flink application과 관련한 env, setting 정의해주면 된다 .
- set_parralelism 같은 경우 flink application이 데이터를 병렬처리할 수준을 정해줌 (쓰레드)
- streaming_mode와 함께 blink planner라는 것이 있는데 쿼리 옵티마이저를 선택해주는 부분이다. blink planner가 옵티마이징이 뛰어나다고 한다.
2. 하둡 , 카프카를 사용하는데 필요한 jar파일 path 지정을 해주고
3. source_query 와 sink query 부분을 각각 작성해줌.
(source =input으로 들어오는 데이터 - kafka , sink = output으로 나가는 데이터- hdfs 를 정의)
source_query를 tweet이라는 테이블명 , sink_query를 sink 라는 테이블로 이름 지어줌 .
(스파크에서 createOrTempView 생각하면 됨 ).
4. t_env.execute_sql 은 실제로 실행해주는 부분은 아님 . 스파크의 lazy evaluation 생각하면됨 .
5. t_env.sql_query().execute_insert("sink") -> source_table tweet을 어떻게 가공해서 sink 테이블로 만들어줄지 부분을 정의하고 적재 할수 있도록 함 .
6. env.exeute -> 위에서 정의한 job을 "flink-kafka_hdfs"이름의 application으로 flink 클러스터에 제출함.