in deploy/docker/cp-tools/research/cellprofiler-web-api/app/src/hcs_clip.py [0:0]
def create_clip(params):
t = time.time()
clip_format = params.get('format') if 'format' in params else '.webm'
codec = params.get('codec') if 'codec' in params else ('mpeg4' if clip_format == '.mp4' else None)
# fps for whole clip
fps = int(params.get('fps')) if 'fps' in params else 1
# frame duration
duration = float(params.get('duration')) if 'duration' in params else 1
if duration < 1:
raise RuntimeError('Duration should be >= 1')
sequence_id = params.get('sequenceId') if 'sequenceId' in params else None
# by_time = 1 - create video for all timepoints and specified z-plane id
# by_time = 0 - create video for all z-planes and specified time point id
by_time = int(params.get('byTime')) if 'byTime' in params else 1
# z-plane id if by_time = 1, time point id if by_time = 0
point_id = params.get('pointId') if 'pointId' in params else '1'
path = HCSManager.get_required_field(params, 'path')
path = prepare_input_path(path)
# by_field = 1 - create video for specified well field
# by_field = 0 - create video for specified well
by_field = int(HCSManager.get_required_field(params, 'byField'))
# well or well field id
cell = int(HCSManager.get_required_field(params, 'cell'))
preview_dir, sequences = parse_hcs(path, sequence_id)
preview_dir = prepare_input_path(preview_dir)
index_path = os.path.join(preview_dir, 'Index.xml')
if not os.path.isfile(index_path):
index_path = os.path.join(preview_dir, 'index.xml')
# hcs channels and z-planes
channels, planes = get_planes_and_channels(index_path)
if by_time:
if point_id not in planes:
raise RuntimeError('Incorrect Z plane id [{}]'.format(point_id))
selected_channels = get_selected_channels(params, channels)
clips = []
durations = []
for seq in sequences.keys():
timepoints = sequences[seq]
if not by_time:
if point_id not in timepoints:
raise RuntimeError('Incorrect timepoint id [{}]'.format(point_id))
ome_tiff_path, offsets_path = get_path(preview_dir, seq, by_field)
pages = get_pages(list(selected_channels.keys()), point_id, planes, len(channels), by_time, timepoints, cell)
set_offsets(offsets_path, pages)
read_images(ome_tiff_path, pages)
frames = timepoints if by_time else planes
for f in frames:
frame_pages = _get_items(pages.values(), 'time_point' if by_time else 'plane', f)
frame_images = []
for frame_page in frame_pages:
channel_id = frame_page['channel_id']
colored_image = color_image(frame_page['image'], selected_channels[channel_id])
frame_images.append(colored_image)
merged_image = merge_hcs_channels(frame_images)
clips.append(np.array(merged_image))
durations.append(duration)
clip_name = get_clip_name(by_field, cell, clip_format, sequence_id, point_id, by_time)
slide = ImageSequenceClip(clips, durations=durations)
slide.write_videofile(clip_name, codec=codec, fps=fps)
t1 = time.time()
return clip_name, round((t1 - t), 2)