std.concurrency

Переместиться к: FiberScheduler · Generator · initOnce · LinkTerminated · locate · MailboxFull · MessageMismatch · OnCrowding · OwnerTerminated · ownerTid · PriorityMessageException · prioritySend · receive · receiveOnly · receiveTimeout · register · Scheduler · scheduler · send · setMaxMailboxSize · spawn · spawnLinked · thisTid · ThreadInfo · ThreadScheduler · Tid · TidMissingException · unregister · yield

Это низкоуровневый API обмена сообщениями, на котором могут быть построены более структурированные или ограничительные API. Общая идея состоит в том, что каждая пересылаемая сущность представлена общим типом дескриптора, называемым Tid, который позволяет отправлять сообщения в логические потоки, которые выполняются как в текущем процессе, так и во внешних процессах с использованием одного и того же интерфейса. Это важный аспект масштабируемости, поскольку он позволяет распределять компоненты программы по доступным ресурсам с незначительными изменениями в реальной реализации.
Логический поток - это контекст выполнения, который имеет свой собственный стек и который выполняется асинхронно по отношению к другим логическим потокам. Это могут быть preemptively scheduled потоки ядра, нити (совместные потоки в пользовательском пространстве) или некоторые другие концепции с аналогичным поведением.
Тип параллельного выполнения, используемый при создании логических потоков, определяется планировщиком (Scheduler), выбранным во время инициализации. В настоящее время поведение по умолчанию заключается в создании нового потока ядра для каждого порождающего вызова, но существуют другие планировщики, которые мультиплексируют нити внутри основного потока, или используют некоторую комбинацию этих двух подходов.

Краткий обзор:

import std.stdio;
import std.concurrency;

void spawnedFunc(Tid ownerTid)
{
    // Получить сообщение из потока-владельца.
    receive(
        (int i) { writeln("Received the number ", i);}
    );

    // Отправить обратно в поток-владелец сообщение об успехе.
    send(ownerTid, true);
}

void main()
{
    // Запустить функцию spawnedFunc в новом потоке.
    auto childTid = spawn(&spawnedFunc, thisTid);

    // Отправить число 42 в этот новый поток.
    send(childTid, 42);

    // Принять код результата. 
    auto wasSuccessful = receiveOnly!(bool);
    assert(wasSuccessful);
    writeln("Successfully printed number.");
}

Лицензия:
Boost License 1.0.
Авторы:
Sean Kelly, Alex Rønne Petersen, Martin Nowak

Исходный код: std/concurrency.d

class MessageMismatch: object.Exception;
Исключение, выбрасываемое шаблоном receiveOnly, если отправлено сообщение с типом, отличным от ожидаемого в принимающем потоке.
class OwnerTerminated: object.Exception;
Исключение, выбрасываемое шаблоном receive, если поток, породивший принимающий поток, был прекращён, и сообщений больше не существует.
class LinkTerminated: object.Exception;
Исключение, выбрасываемое, если связанный поток прекращён.

Переместиться к: message

class PriorityMessageException: object.Exception;
Исключение, выбрасываемое, если сообщение было отправлено в поток через std.concurrency.prioritySend, и у получателя нет обработчика сообщений такого типа.
Variant message;
Посланное сообщение.
class MailboxFull: object.Exception;
Исключение, выбрасываемое при переполнении почтового ящика (mailbox), если почтовый ящик настроен с помощью OnCrowding.throwException.
class TidMissingException: object.Exception;
Исключение, выбрасываемое в случаем отсутствия Tid, например, когда ownerTid не находит поток-владелец.

Переместиться к: toString

struct Tid;
Тип, используемый для представления логического потока.
void toString(scope void delegate(const(char)[]) sink);
Создаёт удобную строку для идентификации этого Tid. Это полезно только для того, чтобы увидеть, являются ли Tid'ы, которые в настоящее время выполняются, одинаковыми или разными, например, для ведения журнала или отладки. Возможна ситуация, при которой Tid, выполняемый в будущем, будет иметь тот же результат toString(), что и другой Tid, который уже завершён.
@property @safe Tid thisTid();
Возвращает Tid текущего потока.
@property Tid ownerTid();
Возвращает Tid потока, который породил текущий поток.
Исключения:
TidMissingException в случае остутсвия потока-владельца.
Tid spawn(F, T...)(F fn, T args)
if (isSpawnable!(F, T));
Запускает fn(args) в новом логическом потоке.
Выполняет переданную функцию в новом логическом потоке, представленном Tid. Вызывающий поток назначается владельцем нового потока. Когда поток-владелец прекращается, в новый поток будет отправлено сообщение OwnerTerminated, в результате чего будет вызвано исключение OwnerTerminated при вызове receive().
Параметры:
F fn Выполняемая функция.
T args Аргументы функции.
Возвращает:
Tid, представляющий новый логический поток.

Замечания: Аргументы args не должны иметь неразделямых псевдонимов. Другими словами, все аргументы, передаваемые в fn должны быть либо разделяемыми (shared), либо неизменяемыми (immutable), или не иметь косвенного доступа через указатель. Это необходимо для обеспечения изоляции между потоками.

Пример:

import std.stdio, std.concurrency;

void f1(string str)
{
    writeln(str);
}

void f2(char[] str)
{
    writeln(str);
}

void main()
{
    auto str = "Hello, world";

    // Работает: string является неизменяемой.
    auto tid1 = spawn(&f1, str);

    // Не сработает: char[] имеет изменяемый псевдоним.
    auto tid2 = spawn(&f2, str.dup);

    // Новый поток с анонимной функцией
    spawn({ writeln("This is so great!"); });
}

Tid spawnLinked(F, T...)(F fn, T args)
if (isSpawnable!(F, T));
Запускает fn(args) в логическом потоке и получает сообщение LinkTerminated при завершении операции.
Выполняет переданную функцию в новом логическом потоке, представленном Tid. Этот новый поток связан с вызывающим потоком, так что если либо он, либо вызывающий поток завершается, другому будет отправлено сообщение LinkTerminated, в результате чего будет выброшено исключение LinkTerminated в receive(). Также сохраняются отношения владелец-потомок от spawn(), поэтому, если связь между потоками нарушена, завершение потока-владельца всё равно приведёт к исключению OwnerTerminated, которое будет выброшено в receive().
Параметры:
F fn Выполняемая функция.
T args Аргументы функции.
Возвращает:
Tid, представляющий новый поток.
void send(T...)(Tid tid, T vals);
Помещает значения vals в качестве сообщения в конце очереди сообщений для потока tid.
Отправляет заданное значение в поток, представленный tid. Как и в случае с std.concurrency.spawn, T не должен иметь неразделяемых псевдонимов.
void prioritySend(T...)(Tid tid, T vals);
Помещает значения vals в качестве сообщения в начале очереди сообщений для потока tid.
Отправляет сообщение в tid, но помещает его в начало очереди сообщений потока tid, а не в конце. Эта функция обычно используется для экстренной связи, сигналах об исключительной ситуации и т.д.
void receive(T...)(T ops);
Получает сообщение из другого потока.
Получает сообщение из другого потока, или блокирует, если нет сообщений указанных типов. Эта функция работает по шаблону, сопоставляющему сообщение с набором делегатов и выполняющему первое найденное совпадение.
Если делегат, принимающий std.variant.Variant, включается в качестве последнего аргумента receive, он будет соответствовать любому сообщению, которое не было сопоставлено с раннее встретившимся делегатом. Если передано более одного аргумента, Variant будет содержать кортеж std.typecons.Tuple из всех отправленных значений.

Пример:

import std.stdio;
import std.variant;
import std.concurrency;

void spawnedFunction()
{
    receive(
        (int i) { writeln("Received an int."); },
        (float f) { writeln("Received a float."); },
        (Variant v) { writeln("Received some other type."); }
    );
}

void main()
{
     auto tid = spawn(&spawnedFunction);
     send(tid, 42);
}

receiveOnlyRet!T receiveOnly(T...)();
Получает сообщения только с аргументами типов T.
Исключения:
MessageMismatch, если получено сообщение из типов, отличных от T.
Возвращает:
Полученное сообщение. Если T.length больше единицы, сообщение будет упаковано в кортеж std.typecons.Tuple.

Пример:

import std.concurrency;

void spawnedFunc()
{
    auto msg = receiveOnly!(int, string)();
    assert(msg[0] == 42);
    assert(msg[1] == "42");
}

void main()
{
    auto tid = spawn(&spawnedFunc);
    send(tid, 42, "42");
}

bool receiveTimeout(T...)(Duration duration, T ops);
Попытается получить сообщение, но сдастся, если не появится соответсвия за время duration. Не будет ждать вообще, если переданное core.time.Duration отрицательно.
То же, что и receive, за исключением того, что вместо бесконечного ожидания сообщения, ожидает, пока не получит сообщение или пока не пройдёт переданное время core.time.Duration. Возвращает true, если сообщение получено, и false, если время ожидания закончилось.

Переместиться к: block · ignore · throwException

enum OnCrowding: int;
Эти типы поведения могут быть указаны на случай переполнения почтового ящика.
block
Подождать, пока не появится пространство.
throwException
Выбросить исключение MailboxFull.
ignore
Отменить отправку и вернуться.

Переместиться к: 2

void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
Устанавливает максимальный размер почтового ящика.
Устанавливает ограничение на максимальное количество сообщений пользователя, разрешённых в почтовом ящике. Если этот предел достигнут, вызывающий поток, пытающийся добавить новое сообщение, выполнит поведение, указанное в doThis. Если messages равно нулю, почтовый ящик неограничен.
Параметры:
Tid tid Tid потока, для которого этот предел должен быть установлен.
size_t messages Максимальное количество сообщений или ноль, если нет ограничений.
OnCrowding doThis Поведение, выполняемое при отправке сообщения в заполненный почтовый ящик.
void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis);
Устанавливает максимальный размер почтового ящика.
Устанавливает ограничение на максимальное количество сообщений пользователя, разрешённых в почтовом ящике. Если это ограничение достигнуто, вызывающий поток, пытающийся добавить новое сообщение, выполнит onCrowdingDoThis. Если messages равно нулю, почтовый ящик неограничен.
Параметры:
Tid tid Tid потока, для которого этот предел должен быть установлен.
size_t messages Максимальное количество сообщений или ноль, если нет ограничений.
bool function(Tid) onCrowdingDoThis Процедура, вызываемая, когда сообщение отправляется в заполненный почтовый ящик.
bool register(string name, Tid tid);
Ассоциирует имя name с tid.
Сопоставляет имя name с tid в локальной области видимости процесса. Когда поток, представленный tid, завершается, любые связанные с ним имена будут автоматически разрегистрированы.
Параметры:
string name Имя, ассоциируемое с tid.
Tid tid Tid, для которого региструруется имя.
Возвращает:
true, если имя name доступно, и tid представляет собой неумерший поток.
bool unregister(string name);
Удаляет зарегистрированное имя name, связанное с tid.
Параметры:
string name Разрегиструруемое имя.
Возвращает:
true, если имя name зарегестрировано, false, если нет.
Tid locate(string name);
Получить Tid, связанный с именем name.
Параметры:
string name Имя для поиска.
Возвращает:
Связанный Tid или Tid.init, если имя name не зарегистрировано.

Переместиться к: cleanup · thisInfo

struct ThreadInfo;
Инкапсулирует все данные уровня реализации, необходимые для планирования.
При определении Scheduler экземпляр этой структуры должен быть связан с каждым логическим потоком. Он содержит всю информацию уровня реализации, необходимую внутреннему API.
static nothrow @property ref auto thisInfo();
Получает локальный для потока экземпляр ThreadInfo.
Получает локальный для потока экземпляр ThreadInfo, который должен использоваться как экземпляр по умолчанию, когда запрашивается информация для потока, не созданного Scheduler'ом.
void cleanup();
Очищает эту ThreadInfo.
Нужно вызывать при завершении запланированного потока. Демонтирует систему обмена сообщениями для потока и уведомляет заинтересованные стороны о завершении потока.

Переместиться к: newCondition · spawn · start · thisInfo · yield

interface Scheduler;
Планировщик, управляющий тем, как потоки выполняются при spawn.
Реализация Scheduler позволяет настроить механизм параллелизма, используемый этим модулем, в соответствии с различными потребностями. По умолчанию вызов spawn создаст новый поток ядра, который выполнит предоставленную процедуру и уничтожится при её завершении. Но можно создавать планировщики, которые повторно используют потоки, которые мультиплексируют нити (сопрограммы) в составе единого потока или любое количество других подходов. Сделав выбор Scheduler опцией уровня пользователя, std.concurrency может использоваться для гораздо большего количества типов приложений, чем если бы это поведение было предопределено.

Пример:

import std.concurrency;
import std.stdio;

void main()
{
    scheduler = new FiberScheduler;
    scheduler.start(
    {
        // Остальная часть main выполняется здесь
        writeln("the rest of main goes here");
    });
}
У некоторых планировщиков есть цикл диспетчеризации, который должен выполняться, чтобы они работали должным образом, поэтому для согласованности, при использовании планировщика, start() необходимо вызывать внутри main(). Это передаёт управление планировщику и гарантирует, что все порождённые потоки будут выполняться ожидаемым образом.

abstract void start(void delegate() op);
Порождает переданный op и запускает Scheduler.
Предполагается, что он будет вызываться в начале программы, чтобы передать все scheduling (yield all scheduling) активному экземпляру планировщика. Это необходимо для планировщиков, которые явно управляют потоками, а не просто полагаются для этого на операционную систему, и поэтому start всегда нужно вызывать внутри main(), чтобы начать нормальное выполнение программы.
Параметры:
void delegate() op Обертка для любого основного потока, который выполнялся бы в отсутствие настраиваемого планировщика. Scheduler автоматически выполнит его через вызов spawn.
abstract void spawn(void delegate() op);
Назначает логический поток для выполнения переданного op.
Эта процедура вызывается через spawn. Ожидается, что он создаст новый логический поток и запустит операцию. Этот поток должен вызвать thisInfo.cleanup() при завершении, если запланированный поток не является потоком ядра – все потоки ядра будут автоматически очищаться от ThreadInfo локальным деструктором потока.
Параметры:
void delegate() op Функция для выполнения. Это может быть настоящей функцией, переданной пользователем для порождения её самой, или может быть оберткой функции.
abstract nothrow void yield();
Обеспечивает выполнение в другом логическом потоке.
Эта процедура вызывается в разных местах в API-интерфейсах, совместимых с параллелизмом, чтобы предоставить планировщику возможность выполнить yield при использовании какой-то совместной модели многопоточности. Если это не подходит, например, когда каждый логический поток поддерживается выделенным потоком ядра, эта процедура может быть отключена.
abstract nothrow @property ref ThreadInfo thisInfo();
Возвращает соответствующий экземпляр ThreadInfo.
Возвращает экземпляр ThreadInfo, специфичный для логического потока, который вызывает эту процедуру, или, если вызывающий поток не был создан этим планировщиком, вместо этого возвращает ThreadInfo.thisInfo.
abstract nothrow Condition newCondition(Mutex m);
Создает аналог переменной состояния для сигнализации.
Создает новый аналог переменной Condition (состояние), который используется для проверки и сигнализации о добавлении сообщений в очередь сообщений потока. Как и в случае yield, некоторым планировщикам может потребоваться определить пользовательское поведение, чтобы вызовы Condition.wait() вместо блокировки передавали управление к другому потоку, если новые сообщения не доступны.
Параметры:
Mutex m Мьютекс, который будет связан с этим состоянием. Он будет заблокирован до любой операции с этим состоянием, и поэтому в некоторых случаях планировщику может потребоваться удерживать ссылку на него и разблокировать мьютекс, прежде чем передать выполнение в другой логический поток.

Переместиться к: newCondition · spawn · start · thisInfo · yield

class ThreadScheduler: std.concurrency.Scheduler;
Пример планировщика Scheduler, использующий потоки ядра.
Это экземпляр планировщика, который отражает поведение планирования по умолчанию, в котором создаётся однин поток ядра для каждого вызова spawn. Он полностью функционален, и его можно создавать и использовать, но не является необходимой частью функционирования данного модуля по умолчанию.
void start(void delegate() op);
Просто запускает op напрямую, поскольку при таком подходе не требуется никакого реального планирования.
void spawn(void delegate() op);
Создает новый поток ядра и назначает его для запуска переданного op.
nothrow void yield();
Этот планировщик не имеет явного мультиплексирования, поэтому это пустая операция.
nothrow @property ref ThreadInfo thisInfo();
Возвращает ThreadInfo.thisInfo, так как это локальный экземпляр потока ThreadInfo, что является правильным поведением для этого планировщика.
nothrow Condition newCondition(Mutex m);
Создает новую переменную Condition. Здесь не нужно никакого нестандартного поведения.

Переместиться к: newCondition · spawn · start · thisInfo · yield

class FiberScheduler: std.concurrency.Scheduler;
Экземпляр планировщика с использованием нитей (Fiber, класс определённый в модуле core.thread – прим.пер.).
Это экземпляр планировщика, который создаёт новую нить при каждом вызове spawn и мультиплексирует выполнение всех нитей в основном потоке.
void start(void delegate() op);
Создает новую нить для переданного op, а затем запускает диспетчер.
nothrow void spawn(void delegate() op);
Создает новую нить для переданного op и добавляет её в список диспетчеризации.
nothrow void yield();
Если вызывается из планируемой нити, это приведёт к передаче выполнения другой запланированной нити.
nothrow @property ref ThreadInfo thisInfo();
Возвращает соответствующий экземпляр ThreadInfo.
Возвращает экземпляр ThreadInfo, специфичный для вызывающей нити, если нить была создана этим диспетчером, в противном случае возвращает ThreadInfo.thisInfo.
nothrow Condition newCondition(Mutex m);
Возвращает аналог Condition (состояние), который передаётся при вызове wait или notify.
Scheduler scheduler;
Устанавливает поведение планировщика в программе.
Эта переменная задает поведение планировщика Scheduler в этой программе. Как правило, при установке планировщика, scheduler.start() следует вызывать в функции main. Выход из этой процедуры не произойдёт, пока не будет завершено выполнение программы.

Переместиться к: 2

nothrow void yield();
Если вызывающий объект является нитью Fiber и не является генератором Generator, эта функция вызовет scheduler.yield() или Fiber.yield(), в зависимости от обстоятельств.

Переместиться к: empty · front · popFront · this

class Generator(T): Fiber, IsGenerator;
Генератор Generator – это нить, которая периодически возвращает значения типа T в вызывающий поток с помощью yield. Действует как входной диапазон InputRange.

Пример:

import std.concurrency;
import std.stdio;


void main()
{
    auto tid = spawn(
    {
        while (true)
        {
            writeln(receiveOnly!int());
        }
    });

    auto r = new Generator!int(
    {
        foreach (i; 1 .. 10)
            yield(i);
    });

    foreach (e; r)
    {
        tid.send(e);
    }
}

Переместиться к: 2 · 3 · 4

this(void function() fn);
Инициализирует объект генератора, связанный со статической D-функцией. Функция будет вызываться один раз, чтобы подготовить диапазон для итерации.
Параметры:
void function() fn Функция, передаваемая в нить.

In: fn не должна быть null.

this(void function() fn, size_t sz);
Инициализирует объект генератора, связанный со статической D-функцией. Функция будет вызываться один раз, чтобы подготовить диапазон для итерации.
Параметры:
void function() fn Функция, передаваемая в нить.
size_t sz Размер стека для этой нити.

In: fn не должна быть null.

this(void delegate() dg);
Инициализирует объект генератора, связанный с динамической D-функцией. Функция будет вызываться один раз, чтобы подготовить диапазон для итерации.
Параметры:
void delegate() dg Функция, передаваемая в нить.

In: dg не должен быть null.

this(void delegate() dg, size_t sz);
Инициализирует объект генератора, связанный с динамической D-функцией. Функция будет вызываться один раз, чтобы подготовить диапазон для итерации.
Параметры:
void delegate() dg Функция, передаваемая в нить..
size_t sz Размер стека для этой нити.

In: dg не должен быть null.

final @property bool empty();
Возвращает true, если генератор пуст.
final void popFront();
Получает следующее значение из базовой функции.
final @property T front();
Возвращает последнее сгенерированное значение.
void yield(T)(ref T value);

void yield(T)(T value);
Передает значение value типа T потоку, вызвавшему выполняющийся в настоящее время генератор.
Параметры:
T value Передаваемое значение.

Переместиться к: 2

ref auto initOnce(alias var)(lazy typeof(var) init);
Инициализирует var ленивым значением init потоко-безопасным способом.
Реализация гарантирует, что все потоки, одновременно вызывающие initOnce с одним и тем же аргументом, блокируются до полной инициализации var. Все побочные эффекты init глобально видны впоследствии.
Параметры:
var Инициализируемая переменная
typeof(var) init Ленивое инициализирующее значение
Возвращает:
Ссылку на инициализированную переменную
Примеры:
Типичным вариантом использования является выполнение ленивой, но потокобезопасной инициализации.
static class MySingleton
{
    static MySingleton instance()
    {
        static __gshared MySingleton inst;
        return initOnce!inst(new MySingleton);
    }
}
assert(MySingleton.instance !is null);
ref auto initOnce(alias var)(lazy typeof(var) init, Mutex mutex);
То же самое, что и предыдущая, но вместо общего доступа ко всем экземплярам initOnce принимает отдельный мьютекс mutex.
Следует использовать, чтобы избежать блокировок, когда выражение init ожидает результата другого потока, который также может вызвать initOnce. Используйте с осторожностью.
Параметры:
var Инициализируемая переменная
typeof(var) init Ленивое инициализирующее значение
Mutex mutex Мьютекс для предотвращения гонки условий
Возвращает:
Ссылку на инициализированную переменную
Примеры:
Используйте отдельный мьютекс, когда init блокирует другой поток, который может также вызвать initOnce.
static shared bool varA, varB;
__gshared Mutex m;
m = new Mutex;

spawn({
    // используйте другой мьютекс для varB, чтобы избежать блокировки
    initOnce!varB(true, m);
    ownerTid.send(true);
});
// init зависит от результата потока, порождённого с помощью spaw
initOnce!varA(receiveOnly!bool);
assert(varA == true);
assert(varB == true);