def create_clip()

in deploy/docker/cp-tools/research/cellprofiler-web-api/app/src/hcs_clip.py [0:0]


def create_clip(params):
    t = time.time()

    clip_format = params.get('format') if 'format' in params else '.webm'
    codec = params.get('codec') if 'codec' in params else ('mpeg4' if clip_format == '.mp4' else None)
    # fps for whole clip
    fps = int(params.get('fps')) if 'fps' in params else 1
    # frame duration
    duration = float(params.get('duration')) if 'duration' in params else 1
    if duration < 1:
        raise RuntimeError('Duration should be >= 1')
    sequence_id = params.get('sequenceId') if 'sequenceId' in params else None
    # by_time = 1 - create video for all timepoints and specified z-plane id
    # by_time = 0 - create video for all z-planes and specified time point id
    by_time = int(params.get('byTime')) if 'byTime' in params else 1
    # z-plane id if by_time = 1, time point id if by_time = 0
    point_id = params.get('pointId') if 'pointId' in params else '1'

    path = HCSManager.get_required_field(params, 'path')
    path = prepare_input_path(path)
    # by_field = 1 - create video for specified well field
    # by_field = 0 - create video for specified well
    by_field = int(HCSManager.get_required_field(params, 'byField'))
    # well or well field id
    cell = int(HCSManager.get_required_field(params, 'cell'))

    preview_dir, sequences = parse_hcs(path, sequence_id)
    preview_dir = prepare_input_path(preview_dir)
    index_path = os.path.join(preview_dir, 'Index.xml')
    if not os.path.isfile(index_path):
        index_path = os.path.join(preview_dir, 'index.xml')
    # hcs channels and z-planes
    channels, planes = get_planes_and_channels(index_path)
    if by_time:
        if point_id not in planes:
            raise RuntimeError('Incorrect Z plane id [{}]'.format(point_id))
    selected_channels = get_selected_channels(params, channels)

    clips = []
    durations = []
    for seq in sequences.keys():
        timepoints = sequences[seq]
        if not by_time:
            if point_id not in timepoints:
                raise RuntimeError('Incorrect timepoint id [{}]'.format(point_id))
        ome_tiff_path, offsets_path = get_path(preview_dir, seq, by_field)
        pages = get_pages(list(selected_channels.keys()), point_id, planes, len(channels), by_time, timepoints, cell)
        set_offsets(offsets_path, pages)
        read_images(ome_tiff_path, pages)
        frames = timepoints if by_time else planes
        for f in frames:
            frame_pages = _get_items(pages.values(), 'time_point' if by_time else 'plane', f)
            frame_images = []
            for frame_page in frame_pages:
                channel_id = frame_page['channel_id']
                colored_image = color_image(frame_page['image'], selected_channels[channel_id])
                frame_images.append(colored_image)
            merged_image = merge_hcs_channels(frame_images)
            clips.append(np.array(merged_image))
            durations.append(duration)

    clip_name = get_clip_name(by_field, cell, clip_format, sequence_id, point_id, by_time)
    slide = ImageSequenceClip(clips, durations=durations)
    slide.write_videofile(clip_name, codec=codec, fps=fps)
    t1 = time.time()
    return clip_name, round((t1 - t), 2)