Нажмите "Enter" для перехода к содержанию

Мультипоточность

Наша компиляция PHP поддерживает работу с многопоточностью.

Она нужна для того, чтобы графический интерфейс не подтормаживал во время тяжелых операций ввода-вывода, вычислений, или цикличного мониторинга.

Отвечает за мультипоточность библиотека PHP Parallel, доступная здесь: https://www.php.net/manual/en/book.parallel.php

Мы же разберем только один пример, который возможно понадобится Вам, если Вы захотите программировать с использованием PHPSupreme.

#!/usr/local/supreme/php/bin/php
<?php
use parallel\Runtime;
use parallel\Channel;
$thread = new Runtime();
$channel = Channel::make("test_channel", 1);
$thread_function = function ($var) use ($channel) {
	$n = 0;
	while (true) {
		$n++;
		echo "В этом потоке мы что-нибудь делаем ($n) \n";
		$out = rand(1,1000);
		$channel->send($out);
		sleep(1);
	}
};
$args[0] = "Test message"; 
$thread->run($thread_function, $args);
while (true) {
	$in = $channel->recv();
	echo "Мы получили с потока: $in\n";
	sleep(1);
}

Код — готовый, работающий, и теперь давайте разберем — что тут происходит, и как он работает.

Сама по себе мультипоточность в простейшем случае состоит из нескольких частей.

  • Собственно сама функция, которая выполняется во втором потоке. В нашем случае это просто лямбда — функция-в-коде. Будьте внимательны, с многопоточностью: для каждого потока создается отдельный экземпляр окружения, так что функции и переменные доступные в общем коде — здесь окажутся недоступными.
  • Обмен сообщениями между функцией, выполняющейся во втором потоке и главным потоком.

А теперь давайте разберем подробнее:

use parallel\Runtime;
use parallel\Channel;
$thread = new Runtime();
$channel = Channel::make("test_channel", 1);

Здесь мы подключаем классы библиотеки, и создаем экземпляры. $thread — это идентификатор рантайма, иначе говоря указатель на второй поток. $channel — это идентификатор канала для обмена информацией между потоками.


$thread_function = function ($var) use ($channel) {
	$n = 0;
	while (true) {
		$n++;
		echo "В этом потоке мы что-нибудь делаем ($n) \n";
		$out = rand(1,1000);
		$channel->send($out);
		sleep(1);
	}
};

Лямбда-функция. В ней мы используем бесконечный цикл, в котором делаем какие-нибудь действия, например выводим текст.

$channel->send($out); — это функция, которая отправляет данные из этого потока в общий поток.


$args[0] = "Test message";
$thread->run($thread_function, $args);

Заполняем необходимые данные, и запускаем второй поток, куда передаем эти данные.

Как было написано выше, второй поток не имеет доступа к данным первого потока. И единственный способ передать туда данные — через функцию, которая принимает в себя массив.

Передать во второй поток вы можете любые сериализированные данные, кроме ресурсов (к примеру дескрипторов наподобие $fp = fopen) и замыканий, поскольку они находятся в одном рантайме. То есть string, int, bool, array — и их комбинации вы можете свободно передавать как элемент массива.

Соответственно в функции второго потока, вы можете обращаться к этим данным, как к элементам массива. Если бы я в коде функции вызвал echo $var[0], то получил бы «Test message».


while (true) {
	$in = $channel->recv();
	echo "Мы получили с потока: $in\n";
	sleep(1);
}

А это уже основной поток, в котором мы запускаем еще один бесконечный цикл.

$in = $channel->recv(); — функция получения данных из второго потока по каналу $channel

Здесь стоит уточнить одну важную вещь. Функции $channel->send() и $channel->recv() являются взаимоблокирующими. Что это значит?

Это значит что функция $channel->send() из второго потока, остановит выполнение второго потока, пока ее данные не будут прочитаны в первом потоке. Соответственно функция $channel->recv() из первого потока так же само его остановит до тех пор, пока второй поток не отправит данные.

Поэтому, чтобы избежать взаимных блокировок, нам нужно просто периодически вызывать функцию $channel->send() во втором потоке. Если у нас нет данных, мы можем отправлять пустое значение, но главное отправлять.

Ну и само собой, в первом потоке функция $channel->recv() тоже должна вызываться периодически, чтобы не допустить зависания второго потока.

Видоизменим скрипт, чтобы увидеть в действии.

#!/usr/local/supreme/php/bin/php
<?php
use parallel\Runtime;
use parallel\Channel;
$thread = new Runtime();
$channel = Channel::make("test_channel", 1);
$thread_function = function ($var) use ($channel) {
	print_r($var); // Распечатываем данные, которые мы передали из первого потока.
	$n = 0;
	while (true) { // Запускаем бесконечный цикл
		$n++;
		echo "В этом потоке мы что-нибудь делаем ($n) \n";
		$out = null; // Устанавливаем переменную для канала как пустую.
		if (rand(1,5)==3) { // Если рандом выпал на 3, то
			$out = "3";        // устанавливаем переменную для канала в "3"
		}
		$channel->send($out); // Шлем данные в канал, пустые или нет - неважно.
		usleep(500000); // Ждем 0.5 сек.
	}
};
$args[0] = "Test message"; // Заполняем данные, которые мы хотим передать в функцию, которую будем вызывать во втором потоке.
$thread->run($thread_function, $args); // Запускаем ее.
while (true) {
	$in = $channel->recv(); // Останавливаем скрипт до получения данных из канала $channel
	if (isset($in)) { // Только если данные есть - их выводим.
		echo "Мы получили с потока: $in\n";
	}
	usleep(500000); // Ждем 0.5 с.
}

Запустив такой код, вы будете видеть постоянную работу второго потока, и периодический вывод цифры «3», если функция rand(1,5) на нее попадет.

Как видите — ничего сложного.