FixAntenna/NetCore/FixEngine/Scheduler/SessionTaskScheduler.cs (169 lines of code) (raw):
// Copyright (c) 2022 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Collections.Generic;
using System.Linq;
using Epam.FixAntenna.NetCore.Common.Logging;
using Epam.FixAntenna.NetCore.FixEngine.Scheduler.Tasks;
using Quartz;
using Quartz.Impl;
using Quartz.Impl.Matchers;
using Quartz.Simpl;
namespace Epam.FixAntenna.NetCore.FixEngine.Scheduler
{
internal class SessionTaskScheduler
{
private static readonly ILog Log = LogFactory.GetLog(typeof(SessionTaskScheduler));
private readonly IScheduler _scheduler;
private readonly SessionParameters _sessionParameters;
// We want a separate scheduler per session, so we need to use a unique per session scheduler name.
// SessionId is not good for that as tests can create several sessions with the same Id.
private readonly string _schedulerName = Guid.NewGuid().ToString();
internal SessionTaskScheduler(SessionParameters sessionParameters)
{
_sessionParameters = sessionParameters;
DirectSchedulerFactory.Instance.CreateScheduler(_schedulerName, _schedulerName, new DefaultThreadPool(), new RAMJobStore());
_scheduler = DirectSchedulerFactory.Instance.GetScheduler(_schedulerName).Result;
_scheduler = _scheduler ?? throw new InvalidOperationException("Cannot create scheduler.");
_scheduler.Start();
}
internal bool IsShutdown() => _scheduler.IsShutdown;
internal void ScheduleCronTask<T>(string pipedCronExpression, TimeZoneInfo timeZone) where T : AbstractSessionTask
{
ScheduleCronJob<T>(pipedCronExpression, timeZone);
}
internal void ScheduleHeartbeat(TimeSpan checkHeartbeatInterval)
{
var jobKey = CreateJobKey<InactivityCheckTask>();
DescheduleJob(jobKey);
var job = CreateJob<InactivityCheckTask>(jobKey);
var trigger = TriggerBuilder.Create()
.WithSimpleSchedule(s => s
.WithInterval(checkHeartbeatInterval)
.WithMisfireHandlingInstructionIgnoreMisfires()
.RepeatForever())
.Build();
var nextRun = _scheduler.ScheduleJob(job, trigger).Result;
Log.Trace($"{nameof(InactivityCheckTask)} will run at {nextRun:O}");
}
internal void ScheduleTestRequest(TimeSpan checkTestRequestInterval)
{
var jobKey = CreateJobKey<TestRequestTask>();
DescheduleJob(jobKey);
var job = CreateJob<TestRequestTask>(jobKey);
var trigger = TriggerBuilder.Create()
.StartAt(DateTimeOffset.Now.AddSeconds(1))
.WithSimpleSchedule(s => s
.WithInterval(checkTestRequestInterval)
.WithMisfireHandlingInstructionIgnoreMisfires()
.RepeatForever())
.Build();
var nextRun = _scheduler.ScheduleJob(job, trigger).Result;
Log.Trace($"{nameof(TestRequestTask)} will run at {nextRun:O}");
}
internal void ScheduleSeqReset(TimeSpan resetTime, TimeZoneInfo resetTimeZone)
{
var jobKey = CreateJobKey<ResetSeqNumTask>();
DescheduleJob(jobKey);
var job = CreateJob<ResetSeqNumTask>(jobKey);
var trigger = TriggerBuilder.Create()
.WithDailyTimeIntervalSchedule(s => s
.WithIntervalInHours(24)
.StartingDailyAt(new TimeOfDay(resetTime.Hours, resetTime.Minutes, resetTime.Seconds))
.InTimeZone(resetTimeZone))
.Build();
var nextRun = _scheduler.ScheduleJob(job, trigger).Result;
Log.Trace($"{nameof(ResetSeqNumTask)} will run at {nextRun:O}");
}
internal bool IsTaskScheduled<T>() where T : AbstractSessionTask
{
var jobKey = CreateJobKey<T>();
return JobExists(jobKey);
}
internal IEnumerable<string> GetCronExpressionsForScheduledCronTask<T>() where T : AbstractSessionTask
{
var jobKey = CreateJobKey<T>();
if (!JobExists(jobKey)) return Enumerable.Empty<string>();
var triggers = _scheduler.GetTriggersOfJob(jobKey).Result;
var result = new List<string>();
foreach (var trigger in triggers)
{
if (trigger is ICronTrigger cronTrigger)
{
result.Add(cronTrigger.CronExpressionString);
}
}
return result;
}
internal void DescheduleTask<T>() where T : AbstractSessionTask
{
var jobKey = CreateJobKey<T>();
DescheduleJob(jobKey);
}
internal void DescheduleAllTasks()
{
var jobKeys = _scheduler.GetJobKeys(GroupMatcher<JobKey>.AnyGroup()).Result;
_scheduler.DeleteJobs(jobKeys).Wait();
}
internal void Shutdown()
{
_scheduler.Shutdown(false).Wait();
}
private JobKey CreateJobKey<T>() where T : IJob
{
return new JobKey(typeof(T).Name, _schedulerName);
}
private bool JobExists(JobKey jobKey)
{
return _scheduler.CheckExists(jobKey).Result;
}
private void ScheduleCronJob<T>(string pipedCronExpression, TimeZoneInfo timeZone)
where T: IJob
{
var jobKey = CreateJobKey<T>();
DescheduleJob(jobKey);
var job = CreateJob<T>(jobKey);
var triggers = MultipartCronExpression
.ExtractCronExpressions(pipedCronExpression)
.Select(cronExpression => TriggerBuilder.Create()
.WithCronSchedule(
cronExpression,
s => s.InTimeZone(timeZone)
)
.Build()
)
.ToArray();
// Quartz.net does not allow scheduling for cron expressions that do not have a fire date in the future.
// Thus, to avoid throwing an exception, we filter them out
var triggersToFire = new List<ICronTrigger>(triggers.Length);
foreach (var trigger in triggers)
{
var cronTrigger = (ICronTrigger)trigger;
if (CanCronTriggerFire(cronTrigger))
{
triggersToFire.Add(cronTrigger);
}
else
{
Log.Warn($"{cronTrigger.CronExpressionString} will not be triggered");
}
}
_scheduler.ScheduleJob(job, triggersToFire, true).Wait();
}
private bool CanCronTriggerFire(ICronTrigger cronTrigger)
{
return
cronTrigger.CronExpressionString != null &&
cronTrigger.GetFireTimeAfter(cronTrigger.StartTimeUtc.AddSeconds(-1)) != null;
}
private IJobDetail CreateJob<T>(JobKey jobKey) where T: IJob
{
return JobBuilder.Create<T>()
.WithIdentity(jobKey)
.UsingJobData("SessionId", _sessionParameters.SessionId.ToString())
.Build();
}
private void DescheduleJob(JobKey key)
{
if (JobExists(key))
{
_scheduler.DeleteJob(key).Wait();
}
}
}
}