Планировщик (шаблон проектирования)

Планировщик
Scheduler
Описан в Design Patterns Нет

Планировщик (англ. scheduler) — параллельный шаблон проектирования, обеспечивающий механизм реализации политики планирования, но при этом не зависящий ни от одной конкретной политики. Управляет порядком, в соответствии с которым потокам предстоит выполнить последовательный код , используя для этого объект, который явным образом задаёт последовательность ожидающих потоков.

Мотивы

  • Несколько потоков могут потребовать доступ к ресурсу одновременно, и только один поток в какой-то момент времени может осуществить доступ к ресурсу.
  • Согласуясь с требованиями программы, потоки должны осуществлять доступ к ресурсу в определенном порядке.

Пример реализации

Пример C#

using System;

namespace Digital_Patterns.Concurrency.Sheduler
{
    class Printer
    {
        private static Int32 mID = 0;

        private Scheduler _scheduler = new Scheduler();

        public void Print(JournalEntry journalEntry)
        {
            Int32 id = ++mID;
            try
            {
                Console.WriteLine(String.Format(@"{0}: enter scheduler", id));
                // вызов не выполнится до тех пор, пока объект Scheduler не решит,
                // что подошла очередь распечатать этот объект JournalEntry
                _scheduler.Enter(journalEntry);
                Console.WriteLine(String.Format(@"{0}: start printing", id));
                try
                {
                    //TODO Something
                    journalEntry.Do(id);
                }
                finally
                {
                    // вызов метода Done говорит Scheduler о том, что объект JournalEntry
                    // распечатан, и может подойти очередь вывода на печать другого объекта
                    // JournalEntry
                    _scheduler.Done();
                    Console.WriteLine(String.Format(@"{0}: done scheduler", id));
                }
            }
            catch (Exception) {}
        }
    }
}


using System;
using System.Collections.Generic;
using System.Threading;

namespace Digital_Patterns.Concurrency.Sheduler
{
    /// <summary>
    /// Экземпляры классов в этой роли управляют обработкой объектов Request <see cref="JournalEntry"/>,
    /// выполняемой объектом Processor <see cref="Printer"/>. Чтобы быть независимыми от типов
    /// запросов, класс <see cref="Scheduler"/> не должен ничего знать об управляемом им классе Request.
    /// Вместо этого он осуществляет доступ к объектам Request через реализуемый ими интерфейс <see cref="ISchedulerOrdering"/>
    /// </summary>
    class Scheduler
    {
        /// <summary>
        /// Объект синхронизации потоков
        /// </summary>
        private AutoResetEvent _event = new AutoResetEvent(false);

        /// <summary>
        /// Устанавливается в null, если управляемый объектом Scheduler ресурс не занят.
        /// </summary>
        private Thread _runningThread;

        /// <summary>
        /// Потоки и их запросы ожидающие выполнения
        /// </summary>
        private Dictionary<Thread, ISchedulerOrdering> _waiting = new Dictionary<Thread, ISchedulerOrdering>();

        /// <summary>
        /// Метод <see cref="Enter"/> вызывается перед тем, как поток начнет использовать управляемый ресурс.
        /// Метод не выполняется до тех пор пока управляемый ресурс не освободится и объект <see cref="Sheduler"/>
        /// не примет решение, что подошла очередь выполнения этого запроса
        /// </summary>
        /// <param name="s"></param>
        public void Enter(ISchedulerOrdering s)
        {
            var thisThread = Thread.CurrentThread;

            lock(this)
            {
                // Определяем не занят ли планировщик
                if(_runningThread == null)
                {
                    // Немедленно начинаем выполнение поступившего запроса
                    _runningThread = thisThread;
                    return;
                }
                _waiting.Add(thisThread, s);
            }
            
            lock(thisThread)
            {
                //Блокируем поток до тех пор, пока планировщик не решит сделать его текущим
                while(thisThread != _runningThread)
                {
                    _event.WaitOne();
                    _event.Set();   // даем возможность другим потокам проверить своё состояние
                    Thread.Sleep(1);
                }
                _event.Reset();
            }

            lock (this)
            {
                _waiting.Remove(thisThread);
            }
        }

        /// <summary>
        /// Вызов метода <see cref="Done"/> указывает на то, что текущий поток завершил работу
        /// и управляемый ресурс освободился
        /// </summary>
        public void Done()
        {
            lock (this)
            {
                if (_runningThread != Thread.CurrentThread)
                    throw new ThreadStateException(@"Wrong Thread");

                Int32 waitCount = _waiting.Count;
                if (waitCount <= 0)
                {
                    _runningThread = null;
                }
                else if (waitCount == 1)
                {
                    _runningThread = _waiting.First().Key;
                    _waiting.Remove(_runningThread);
                    _event.Set();
                }
                else
                {
                    var next = _waiting.First();
                    foreach (var wait in _waiting)
                    {
                        if(wait.Value.ScheduleBefore(next.Value))
                        {
                            next = wait;
                        }
                    }

                    _runningThread = next.Key;
                    _event.Set();
                }
            }
        }
    }

    /// <summary>
    /// Вспомогательный класс
    /// </summary>
    static partial class ConvertTo
    {
        /// <summary>
        /// Получить первый элемент коллекции
        /// </summary>
        /// <param name="collection"></param>
        /// <returns></returns>
        public static KeyValuePair<Thread, ISchedulerOrdering> First(this Dictionary<Thread, ISchedulerOrdering> collection)
        {
            foreach (var item in collection)
            {
                return item;
            }
            throw new ArgumentException();
        }
    }

}


using System;

namespace Digital_Patterns.Concurrency.Sheduler
{
    /// <summary>
    /// Если несколько операций ожидают доступа к ресурсу, класс<see cref="Scheduler"/> использует
    /// данный интерфейс для определения порядка выполнения операций.
    /// </summary>
    interface ISchedulerOrdering
    {
        Boolean ScheduleBefore(ISchedulerOrdering s);
    }
}


using System;
using System.Threading;

namespace Digital_Patterns.Concurrency.Sheduler
{
    /// <summary>
    /// Примерный код класса <see cref="JournalEntry"/>, который должен быть
    /// распечатан классом <see cref="Printer"/>
    /// </summary>
    class JournalEntry : ISchedulerOrdering
    {
        private static DateTime mTime = DateTime.Now;

        private DateTime _time;

        /// <summary>
        /// Возвращает время создания этого объекта
        /// </summary>
        public DateTime Time { get { return _time; } }

        private String _msg;

        public JournalEntry(String msg)
        {
            mTime = mTime.AddSeconds(1);
            _time = mTime;
            _msg = msg;
        }

        public void Do(Int32 id)
        {
            Console.WriteLine(String.Format(@"{0}: Start doing : {1} : {2}", id, _time, _msg));
            Thread.Sleep(1000);
            Console.WriteLine(String.Format(@"{0}: Finish do : {1} : {2}", id, _time, _msg));
        }

        /// <summary>
        /// Возвращает true, если данный запрос должен
        /// обрабатываться перед этим запросом.
        /// </summary>
        /// <param name="s"></param>
        /// <returns></returns>
        public Boolean ScheduleBefore(ISchedulerOrdering s)
        {
            if(s is JournalEntry)
            {
                var otherJournalEntry = (JournalEntry) s;
                return (this.Time < otherJournalEntry.Time);
            }
            return false;
        }
    }
}


using System;
using System.Threading;

namespace Digital_Patterns.Concurrency.Sheduler
{
    public class Example01
    {
        private Printer _printer;

        public void Run()
        {
            Console.WriteLine(@"Press any key for start, and press again for finish");
            Console.ReadKey();
            
            _printer = new Printer();
            new Thread(Thread1).Start();
            new Thread(Thread2).Start();
            new Thread(Thread3).Start();

            Console.ReadKey();
        }

        private void Thread1()
        {
            var msg1 = new JournalEntry(@"Buy toll 5.45 USD");
            var msg2 = new JournalEntry(@"Buy candy 1.05 USD");
            var msg3 = new JournalEntry(@"Buy chocolate 3.25 USD");
            _printer.Print(msg1);
            _printer.Print(msg2);
            _printer.Print(msg3);
        }

        private void Thread2()
        {
            var msg4 = new JournalEntry(@"Buy postcard 2.05 USD");
            var msg5 = new JournalEntry(@"Buy gerland 37.78 USD");
            _printer.Print(msg4);
            _printer.Print(msg5);
        }

        private void Thread3()
        {
            var msg6 = new JournalEntry(@"Buy ball 30.06 USD");
            var msg7 = new JournalEntry(@"Buy pipe 1.83 USD");
            _printer.Print(msg6);
            _printer.Print(msg7);
        }
    }
}


using System;
using Digital_Patterns.Concurrency.Sheduler;

namespace Digital_Patterns
{
    class Program
    {
        static void Main(string[] args)
        {
            new Example01().Run();

            Console.WriteLine(@"Press any key for end");
            Console.ReadKey();
        }
    }
}

Ссылки

  • Mark Grand. Patterns in Java Volume 1: A Catalog of Reusable Design Patterns Illustrated with UML. — Wiley & Sons, 1998. — 480 с. — ISBN 0471258393. (см. синопсис  (англ.))