in web/frontend/src/app/pages/streams/modules/monitor-log/services/monitor-log-grid-data.service.ts [39:156]
public getSubscription(url: string, dataObj: WSLiveModel, schemaMap: SchemaTypesMap) {
const FROM_DATE = new Date();
if (this.subscription$) {
this.subscription$.complete();
}
this.subscription$ = new Subject();
const stompHeaders: StompHeaders = {};
Object.keys(dataObj).forEach((key) => {
if (typeof dataObj[key] !== 'object') {
stompHeaders[key] = dataObj[key] + '';
} else if (dataObj[key] && typeof dataObj[key] === 'object') {
stompHeaders[key] = JSON.stringify(dataObj[key]);
}
});
stompHeaders['fromTimestamp'] = FROM_DATE.toISOString();
// if (dataObj.space) {
//
// }
this.appStore
.pipe(select(getActiveOrFirstTab))
.pipe(
take(1),
switchMap((activeTab: TabModel) => {
let params = {},
httpUrl;
const filter = activeTab.filter || {};
if (activeTab.symbol) {
httpUrl = `${encodeURIComponent(activeTab.stream)}/${encodeURIComponent(
activeTab.symbol,
)}`;
} else {
httpUrl = `${encodeURIComponent(activeTab.stream)}`;
}
httpUrl += '/select';
params = {
...params,
offset: 0,
rows: 100,
reverse: true,
};
Object.keys(filter).forEach((key) => {
if (
key != null &&
key !== 'filter_symbols' &&
key !== 'filter_types' &&
// && filter[key] !== 'tabName'
key !== 'filter_date_format' &&
key !== 'filter_time_format'
) {
params[key] = filter[key];
} else if (key === 'filter_symbols') {
params['symbols'] = filter[key];
} else if (key === 'filter_types') {
params['types'] = filter[key];
}
});
if (!params['from']) params['from'] = FROM_DATE.toISOString();
if (typeof activeTab.space === 'string') {
params['space'] = encodeURIComponent(activeTab.space);
}
return this.httpClient.post<StreamDetailsModel[]>(httpUrl, params);
}),
map((data) => {
this.rowsStore = data
.map((row) => new StreamDetailsModel(row, schemaMap))
.sort((row1, row2) => {
const TIME2 = new Date(row2.timestamp).getTime(),
TIME1 = new Date(row1.timestamp).getTime();
if (TIME2 < TIME1) {
return -1;
} else if (TIME1 === TIME2) {
return 0;
}
return 1;
});
return {
data: this.rowsStore,
};
}),
tap((data) => this.subscription$.next(data)),
switchMap(() =>
this.wsService.watch(url, stompHeaders, {destination: url}).pipe(
withLatestFrom(this.appStore.select(getAppVisibility)),
filter(
([ws_data, app_is_visible]: [any, boolean]) =>
app_is_visible /* && !!(this.readyApi && this.readyApi.api)*/,
),
map(([ws_data]) => ws_data),
map((ws_data) =>
JSON.parse(ws_data.body).map((row) => new StreamDetailsModel(row, schemaMap)),
),
map((data: StreamDetailsModel[]) => {
this.rowsStore = [...this.rowsStore, ...data].sort((row1, row2) => {
const TIME2 = new Date(row2.timestamp).getTime(),
TIME1 = new Date(row1.timestamp).getTime();
if (TIME2 < TIME1) {
return -1;
} else if (TIME1 === TIME2) {
return 0;
}
return 1;
});
this.rowsStore.splice(100, this.rowsStore.length - 100);
return {
data: this.rowsStore,
newDataLength: Math.min(data.length, this.rowsStore.length),
};
}),
throttleTime(500),
takeUntil(this.destroy$),
),
),
)
.subscribe((data) => this.subscription$.next(data));
return this.subscription$.pipe(takeUntil(this.destroy$));
}