~ DATA STREAM CREATE
~ S3 Bucket CREATE
~ GLUE DATABASE CREATE
1. Create Table for Data Stream
%flink.ssql
CREATE TABLE sourcetable (
`event_time` TIMESTAMP(3),
`status` VARCHAR(6),
`current_temperature` DOUBLE,
`sensor_id` INTEGER,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'wsi-kinesis-input-stream',
'aws.region' = 'ap-northeast-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
2. Create table for to put Stream Data into S3
%flink.ssql(type=update)
CREATE TABLE sinktable (
`event_time` TIMESTAMP(3),
`status` VARCHAR(6),
`current_temperature` DOUBLE,
`sensor_id` INTEGER
)
PARTITIONED BY (status)
WITH (
'connector'='filesystem',
'path' = 's3://wsi-kinesis-output-bucket/result',
'format' = 'csv',
'sink.partition-commit.policy.kind'='success-file',
'sink.partition-commit.delay' = '1 min'
)
3. another setting
%flink.pyflink
st_env.get_config().get_configuration().set_string(
"execution.checkpointing.mode", "EXACTLY_ONCE"
)
st_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval", "1min"
)
4. Put Stream Data into S3 Bucket
%flink.ssql(type=update)
INSERT INTO sinktable SELECT * FROM sourcetable
IAM:
Add S3 Access Policy at studio role
'IT Study > Memo & Tips' 카테고리의 다른 글
[AWS] EKS ALB Ingress Controller - ingress class 변경 (0) | 2022.08.24 |
---|---|
eksctl, kubectl install (0) | 2022.08.12 |
[AWS] EKS IAM (0) | 2022.08.09 |
[AWS] Amazon Linux 2 Apache Tomcat User Data (0) | 2022.03.23 |
[AWS] EC2 CloudWatch Agent launch template (0) | 2022.02.08 |