[AWS] KDA Studio
~ DATA STREAM CREATE
~ S3 Bucket CREATE
~ GLUE DATABASE CREATE
1. Create Table for Data Stream
%flink.ssql
CREATE TABLE sourcetable (
`event_time` TIMESTAMP(3),
`status` VARCHAR(6),
`current_temperature` DOUBLE,
`sensor_id` INTEGER,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'wsi-kinesis-input-stream',
'aws.region' = 'ap-northeast-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
2. Create table for to put Stream Data into S3
%flink.ssql(type=update)
CREATE TABLE sinktable (
`event_time` TIMESTAMP(3),
`status` VARCHAR(6),
`current_temperature` DOUBLE,
`sensor_id` INTEGER
)
PARTITIONED BY (status)
WITH (
'connector'='filesystem',
'path' = 's3://wsi-kinesis-output-bucket/result',
'format' = 'csv',
'sink.partition-commit.policy.kind'='success-file',
'sink.partition-commit.delay' = '1 min'
)
3. another setting
%flink.pyflink
st_env.get_config().get_configuration().set_string(
"execution.checkpointing.mode", "EXACTLY_ONCE"
)
st_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval", "1min"
)
4. Put Stream Data into S3 Bucket
%flink.ssql(type=update)
INSERT INTO sinktable SELECT * FROM sourcetable
IAM:
Add S3 Access Policy at studio role