Параллельное выполнение


Предыдущая страница
Следующая страница  

Параллелизм делает возможным запуск программы по нескольким потокам одновременно. Примером параллельной программы является веб-сервер, одновременно реагирующий на множество клиентов. Параллельное выполнение осуществляется достаточно просто при передаче сообщений, но может быть очень сложным, если оно основано на совместном использовании данных.

Работу с параллельным выполнением обеспечивает модуль стандартной библиотеки std.concurrency – прим.пер.

Данные, передаваемые между потоками, называются сообщениями. Сообщения могут состоять из переменных любого типа и любого количества. Каждый поток имеет идентификатор, который используется для указания получателей сообщений. Любой поток, который запускает другой поток, называется владельцем нового потока.

Инициирование потоков в D

Функция spawn() принимает указатель в качестве параметра и запускает новый поток, стартующий с этой функции. Любые операции, выполняемые этой функцией, включая другие функции, которые она может вызвать, будут выполняться в новом потоке. Owner (владелец) и worker (работник) начинают выполняться отдельно, как если бы они были независимыми программами.

Пример

import std.stdio; 
import std.stdio; 
import std.concurrency; 
import core.thread;
  
void worker(int a) { 
   foreach (i; 0 .. 4) { 
      Thread.sleep(dur!"seconds"(1)); 
      writeln("Рабочий поток ",a + i); 
   } 
}

void main() { 
   foreach (i; 1 .. 4) { 
      Thread.sleep(dur!"seconds"(2)); 
      writeln("Главный поток ",i); 
      spawn(&worker, i * 5); 
   }
   
   writeln("Функция main завершена.");  
}

Когда вы скомпилируете и выполните эту программу, она возвратит результат, похожий на это:

Главный поток 1
Рабочий поток 5
Главный поток 2
Рабочий поток 6
Рабочий поток 7
Рабочий поток 10
Главный поток 3
Функция main завершена.
Рабочий поток 8
Рабочий поток 11
Рабочий поток 15
Рабочий поток 12
Рабочий поток 16
Рабочий поток 13
Рабочий поток 17
Рабочий поток 18

Идентификаторы потоков в D

Переменная thisTid, доступная глобально на уровне модуля, всегда является идентификатором текущего потока. Также вы можете получить threadId, когда вызывается spawn. Пример показан ниже.

Пример

import std.stdio; 
import std.concurrency;  

void printTid(string tag) { 
   writefln("%s: %s, адрес: %s", tag, thisTid, &thisTid); 
} 
 
void worker() { 
   printTid("Рабочий поток "); 
}
  
void main() { 
   Tid myWorker = spawn(&worker); 
   
   printTid("Главный поток "); 
   
   writeln(myWorker); 
}

Когда вы скомпилируете и выполните эту программу, она возвратит результат, похожий на это:

Главный поток : Tid(std.concurrency.MessageBox), адрес: 46C958
Tid(std.concurrency.MessageBox)
Рабочий поток : Tid(std.concurrency.MessageBox), адрес: 46C958

Передача сообщений в D

Функция send() отправляет сообщения, а функция receiveOnly() ожидает сообщения определенного типа. Существуют и другие подобные функции prioritySend(), receive() и receiveTimeout(), которые объясняются далее.

Главный поток в следующей программе отправляет своему рабочему потоку сообщение типа int и ожидает сообщения от работника типа double. Потоки продолжают отправлять сообщения туда и обратно, пока главный поток не отправит отрицательное целое число. Пример показан ниже.

Пример

import std.stdio; 
import std.concurrency; 
import core.thread; 
import std.conv;  

void workerFunc(Tid tid) { 
   int value = 0;  
   while (value >= 0) { 
      value = receiveOnly!int(); 
      auto result = to!double(value) * 5; tid.send(result);
   }
} 
 
void main() { 
   Tid worker = spawn(&workerFunc,thisTid); 
    
   foreach (value; 5 .. 10) { 
      worker.send(value); 
      auto result = receiveOnly!double(); 
      writefln("отослано: %s, принято: %s", value, result); 
   }
   
   worker.send(-1); 
} 

Когда вы скомпилируете и выполните эту программу, она возвратит следующий результат:

отослано: 5, принято: 25
отослано: 6, принято: 30
отослано: 7, принято: 35
отослано: 8, принято: 40
отослано: 9, принято: 45

Передача сообщения с ожиданием в D

Простой пример с передачей сообщений и ожиданием показан ниже.

import std.stdio; 
import std.concurrency; 
import core.thread; 
import std.conv; 
 
void workerFunc(Tid tid) { 
   Thread.sleep(dur!("msecs")( 500 ),); 
   tid.send("привет"); 
}
  
void main() { 
   spawn(&workerFunc,thisTid);  
   writeln("Ожидание сообщения");  
   bool received = false;
   
   while (!received) { 
      received = receiveTimeout(dur!("msecs")( 100 ), (string message) { 
         writeln("принято: ", message); 
      });

      if (!received) { 
         writeln("... пока нет сообщения"); 
      }
   } 
}

Когда вы скомпилируете и выполните эту программу, она возвратит следующий результат:

Ожидание сообщения
... пока нет сообщения
... пока нет сообщения
... пока нет сообщения
... пока нет сообщения
... пока нет сообщения
принято: привет

Предыдущая страница
Следующая страница