controllers/flinkcluster/flinkcluster_submit_job_script.go (134 lines of code) (raw):
/*
Copyright 2020 Google LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flinkcluster
// This script is part of the cluster's ConfigMap and is mounted into the
// job (submitter) container at `/opt/flink-operator/submit-job.sh` for job
// submission.
var submitJobScript = `
#! /usr/bin/env bash
# This script checks the status of the JobManager and submits the Flink job when ready.
# When job submission is finished, the result is recorded in the pod termination log in the following YAML format.
#
# When succeeded:
#
# jobID: ec74209eb4e3db8ae72db00bd7a830aa
# message: |
# Successfully submitted!
# /opt/flink/bin/flink run --jobmanager flinkjobcluster-sample-jobmanager:8081 --class org.apache.flink.streaming.examples.wordcount.WordCount --parallelism 2 --detached ./examples/streaming/WordCount.jar --input ./README.txt
# Starting execution of program
# Printing result to stdout. Use --output to specify output path.
# Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa
#
# When submission fails (no jobID):
#
# message: |
# Aborted submit because JobManager is unavailable.
# Checking job manager to be ready. Will check success of 2 API calls for stable job submission.
# curl -sS "http://t3-jobmanager:8081/jobs"
# curl: (7) Failed to connect to t3-jobmanager port 8081: Connection refused
# ...
set -euo pipefail
readonly TERM_LOG="/dev/termination-log"
TERM_LOG_SIZE=0
TERM_LOG_MAX_SIZE=4096
function echo_log() {
local msg="$1"
local log_file="$2"
echo -e "${msg}" | tee -a "${log_file}"
}
function write_term_log() {
local msg=$1
local size=$(echo -e "${msg}" | wc -c)
if ((TERM_LOG_SIZE + size > TERM_LOG_MAX_SIZE)); then
return 0
fi
echo "${msg}" >>"${TERM_LOG}"
TERM_LOG_SIZE=$((TERM_LOG_SIZE + size))
return $((size))
}
# write message to termination log as YAML format.
function write_term_log_msg() {
local result_msg="$1"
local log_file="$2"
# Write result message.
write_term_log "message: |"
write_term_log " ${result_msg}"
# Append submit log to message.
# Two space indentation is required to write strings in the form of YAML literal block scalar.
IFS=''
while read -r line; do
# Insert indentation before printing line.
write_term_log " ${line}"
if [ $? = 0 ]; then
break
fi
done <"${log_file}"
}
function check_jm_ready() {
# Waiting for 10 mins.
local -r MAX_RETRY=120
local -r RETRY_INTERVAL=5s
local -r REQUIRED_SUCCESS_NUMBER=3
local -r CONNECT_TIMEOUT=5
local success_count=0
echo_log "Checking job manager to be ready. Will check success of ${REQUIRED_SUCCESS_NUMBER} API calls for stable job submission." "job_check_log"
for ((i = 1; i <= MAX_RETRY; i++)); do
echo_log "curl -sS \"http://${FLINK_JM_ADDR}/jobs\"" "job_check_log"
if curl -sS --connect-timeout ${CONNECT_TIMEOUT} "http://${FLINK_JM_ADDR}/jobs" 2>&1 | tee -a job_check_log; then
((success_count++))
echo_log "\nSuccess ${success_count}/${REQUIRED_SUCCESS_NUMBER}" "job_check_log"
if ((success_count < REQUIRED_SUCCESS_NUMBER)); then
echo_log "\nWaiting..." "job_check_log"
sleep "${RETRY_INTERVAL}"
continue
fi
echo_log "\nJob manager is ready now. Tried ${i} time(s), every ${RETRY_INTERVAL} and succeeded ${success_count} time(s)." "job_check_log"
return 0
else
echo_log "\nWaiting..." "job_check_log"
fi
sleep "${RETRY_INTERVAL}"
done
echo_log "\nReached max retry count(${MAX_RETRY}) to check job manager status." "job_check_log"
echo_log "Aborted to submit job." "job_check_log"
write_term_log_msg "Aborted submit because JobManager is unavailable." "job_check_log"
return 1
}
function submit_job() {
local job_id=""
# Submit job and extract the job ID
echo "/opt/flink/bin/flink run $*" | tee -a submit_log
/opt/flink/bin/flink run "$@" 2>&1 | tee -a submit_log
local -r job_exit_code=$?
local -r job_id_indicator="Job has been submitted with JobID"
job_id=$(grep "${job_id_indicator}" submit_log | awk -F "${job_id_indicator}" '{printf $2}' | awk '{printf $1}')
# Write result as YAML format to pod termination-log.
# On failure, write log only.
if [[ -z ${job_id} ]]; then
if [ $job_exit_code -eq 0 ]; then
write_term_log_msg "The submitter finished successfully but there is no job id. Did you forget to execute the pipeline?" "submit_log"
return 1
fi
write_term_log_msg "Failed to submit." "submit_log"
return 1
fi
# write job ID if there is one
write_term_log "jobID: ${job_id}"
# check the job's exit code
if [ $job_exit_code -ne 0 ]; then
write_term_log_msg "Job failed with a non-zero exit code: ${job_exit_code}" "submit_log"
return $job_exit_code
fi
# On success, write log
write_term_log_msg "Successfully submitted!" "submit_log"
return 0
}
function main() {
echo -e "---------- Checking job manager status ----------"
if ! check_jm_ready; then
exit 1
fi
echo -e "\n---------- Submitting job ----------"
set +e
submit_job "$@"
submit_job_result=$?
set -e
exit $submit_job_result
}
main "$@"
`