in server/src/main/java/com/epam/aidial/core/server/log/GfLogStore.java [272:336]
static String assembleStreamingResponse(@Nullable Buffer response) {
if (response == null) {
return null;
}
try (Scanner scanner = new Scanner(new ByteBufInputStream(response.getByteBuf()))) {
ObjectNode last = null;
JsonNode usage = null;
JsonNode statistics = null;
JsonNode systemFingerprint = null;
JsonNode model = null;
JsonNode choices = null;
// each chunk is separated by one or multiple new lines with the prefix: 'data:' (except the first chunk)
// chunks may contain `data:` inside chunk data, which may lead to incorrect parsing
scanner.useDelimiter("(^data: *|\n+data: *)");
while (scanner.hasNext()) {
String chunk = scanner.next();
if (chunk.startsWith("[DONE]")) {
break;
}
ObjectNode tree = (ObjectNode) ProxyUtil.MAPPER.readTree(chunk);
usage = MergeChunks.merge(usage, tree.get("usage"));
statistics = MergeChunks.merge(statistics, tree.get("statistics"));
if (tree.get("system_fingerprint") != null) {
systemFingerprint = tree.get("system_fingerprint");
}
if (model == null && tree.get("model") != null) {
model = tree.get("model");
}
last = tree;
choices = MergeChunks.merge(choices, tree.get("choices"));
}
if (last == null) {
log.warn("no chunk is found in streaming response");
return "{}";
}
ObjectNode result = ProxyUtil.MAPPER.createObjectNode();
result.set("id", last.get("id"));
result.put("object", "chat.completion");
result.set("created", last.get("created"));
result.set("model", model);
if (usage != null) {
MergeChunks.removeIndices(usage);
result.set("usage", usage);
}
if (statistics != null) {
MergeChunks.removeIndices(statistics);
result.set("statistics", statistics);
}
if (systemFingerprint != null) {
result.set("system_fingerprint", systemFingerprint);
}
if (choices != null) {
MergeChunks.removeIndices(choices);
result.set("choices", choices);
}
return ProxyUtil.convertToString(result);
} catch (Throwable e) {
log.warn("Can't assemble streaming response", e);
return "{}";
}
}