flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java [110:151]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) {
		config.getFailureHandler(); // checks if we can instantiate the custom failure handler
		config.getHosts(); // validate hosts
		validate(
			config.getIndex().length() >= 1,
			() -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
		int maxActions = config.getBulkFlushMaxActions();
		validate(
			maxActions == -1 || maxActions >= 1,
			() -> String.format(
				"'%s' must be at least 1. Got: %s",
				BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
				maxActions)
		);
		long maxSize = config.getBulkFlushMaxByteSize();
		long mb1 = 1024 * 1024;
		validate(
			maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
			() -> String.format(
				"'%s' must be in MB granularity. Got: %s",
				BULK_FLASH_MAX_SIZE_OPTION.key(),
				originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
		);
		validate(
			config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
			() -> String.format(
				"'%s' must be at least 1. Got: %s",
				BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
				config.getBulkFlushBackoffRetries().get())
		);
		if (config.getUsername().isPresent() && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
			validate(
				config.getPassword().isPresent() && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
				() -> String.format(
					"'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
					USERNAME_OPTION.key(),
					PASSWORD_OPTION.key(),
					config.getUsername().get(),
					config.getPassword().orElse("")
				));
		}
	}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java [111:152]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
		config.getFailureHandler(); // checks if we can instantiate the custom failure handler
		config.getHosts(); // validate hosts
		validate(
			config.getIndex().length() >= 1,
			() -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
		int maxActions = config.getBulkFlushMaxActions();
		validate(
			maxActions == -1 || maxActions >= 1,
			() -> String.format(
				"'%s' must be at least 1. Got: %s",
				BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
				maxActions)
		);
		long maxSize = config.getBulkFlushMaxByteSize();
		long mb1 = 1024 * 1024;
		validate(
			maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
			() -> String.format(
				"'%s' must be in MB granularity. Got: %s",
				BULK_FLASH_MAX_SIZE_OPTION.key(),
				originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
		);
		validate(
			config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
			() -> String.format(
				"'%s' must be at least 1. Got: %s",
				BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
				config.getBulkFlushBackoffRetries().get())
		);
		if (config.getUsername().isPresent() && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
			validate(
				config.getPassword().isPresent() && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
				() -> String.format(
					"'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
					USERNAME_OPTION.key(),
					PASSWORD_OPTION.key(),
					config.getUsername().get(),
					config.getPassword().orElse("")
				));
		}
	}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



