IT Study/Memo & Tips

[AWS] KDA Studio

ITguny 2022. 4. 5. 21:44

 

~ DATA STREAM CREATE

~ S3 Bucket CREATE

~ GLUE DATABASE CREATE

 

1. Create Table for Data Stream

 

%flink.ssql

CREATE TABLE sourcetable (
   `event_time` TIMESTAMP(3),
   `status` VARCHAR(6),
   `current_temperature` DOUBLE,
   `sensor_id` INTEGER,
   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kinesis',
    'stream' = 'wsi-kinesis-input-stream',
    'aws.region' = 'ap-northeast-2',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
)

 

2. Create table for to put Stream Data into S3

%flink.ssql(type=update)

CREATE TABLE sinktable (
     `event_time` TIMESTAMP(3),
     `status` VARCHAR(6),
     `current_temperature` DOUBLE,
     `sensor_id` INTEGER
)
PARTITIONED BY (status)
WITH (
      'connector'='filesystem',
      'path' = 's3://wsi-kinesis-output-bucket/result',
      'format' = 'csv',
      'sink.partition-commit.policy.kind'='success-file',
      'sink.partition-commit.delay' = '1 min'
)

 

3. another setting

%flink.pyflink
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)

 

4. Put Stream Data into S3 Bucket

%flink.ssql(type=update)
INSERT INTO sinktable SELECT * FROM sourcetable

 

IAM: 

 

Add S3 Access Policy at studio role 

 

p.s. https://skills123.tistory.com/25