Глава 17 Thinking in Java 4th edition |
ПАРАЛЛЕЛЬНОЕ ВЫПОЛНЕНИЕ
До настоящего момента мы имели дело исключительно с последовательным программированием. Все действия, выполняемые программой, выполнялись друг за другом, то есть последовательно.ПАРАЛЛЕЛЬНОЕ ВЫПОЛНЕНИЕ
До настоящего момента мы имели дело исключительно с последовательным программированием. Все действия, выполняемые программой, выполнялись друг за другом, то есть последовательно.
Последовательное программирование способно решить многие задачи. Однако для некоторых задач бывает удобно (и даже необходимо) организовать параллельное выполнение нескольких частей программы, чтобы создать у пользователя впечатление одновременного выполнения этих частей, или — если на компьютере установлено несколько процессоров — чтобы они действительно выполнялись одновременно.
Каждая из этих самостоятельных подзадач называется потоком (thread). Программа пишется так, словно каждый поток запускается сам по себе и использует процессор в монопольном режиме. На самом деле существует некоторый системный механизм, который обеспечивает совместное использованием процессора, но в основном думать об этом вам не придется. Модель потоков (и ее поддержка в языке Java) является программным механизмом, упрощающим одновременное выполнение нескольких операций в одной и той же программе.
Процессор периодически вмешивается в происходящие события, выделяя каждому потоку некоторой отрезок времени. Для каждого потока все выглядит так, словно процессор используется в монопольном режиме, но на самом деле время процессора разделяется между всеми существующими в программе потоками (исключение составляет ситуация, когда программа действительно выполняется на многопроцессорном компьютере). Однако при использовании потоков вам не нужно задумываться об этих тонкостях — код не зависит от того, на скольких процессорах вам придется работать. Таким образом, потоки предоставляют механизм масштабирования производительности — если программа работает слишком медленно, вы в силах легко ускорить ее, установив на компьютер дополнительные процессоры. Многозадачность и многопоточность являются, похоже, наиболее вескими причинами использования многопроцессорных систем.
Задачи
Программный поток представляет некоторую задачу или операцию, поэтому нам понадобятся средства для описания этой задачи. Их предоставляет интерфейс Runnable. Чтобы определить задачу, реализуйте Runnable и напишите метод run(), содержащий код выполнения нужных действий.
Например, задача LiftOff выводит обратный отсчет перед стартом:
//: concurrency/LiftOff.java
// Реализация интерфейса Runnable.
public class LiftOff implements Runnable {
protected int countDown = 10; // Значение по умолчанию
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
}
По идентификатору id различаются экземпляры задачи. Поле объявлено с ключевым словом final, поскольку оно не будет изменяться после инициализации.
Метод run() обычно содержит некоторый цикл, который продолжает выполняться до тех пор, пока не будет достигнуто некоторое завершающее условие. Следовательно, вы должны задать условие выхода из цикла (например, просто вернуть управление командой return). Часто run() выполняется в виде бесконечного цикла, а это означает, что при отсутствии завершающего условия выполнение будет продолжаться бесконечно (позднее в этой главе вы узнаете, как организовать безопасное завершение задач).
Вызов статического метода Thread.yield() в run() обращен к планировщику потоков (часть потокового механизма Java, обеспечивающая переключение процессора между потоками). Фактически он означает, что очередная важная часть цикла была выполнена и теперь можно на время переключиться на другую задачу. Вызов yield() не обязателен, но в данном примере он обеспечивает более интересные результаты: вы с большей вероятностью увидите, что программный поток прерывает и возобновляет свою работу.
В следующем примере метод run() не выделяется в отдельный программный поток, а просто вызывается напрямую в main() (впрочем, поток все же используется — тот, который всегда создается для main()):
//: concurrency/MainThread.java
public class MainThread {
public static void main(String[] args) {
LiftOff launch = new LiftOff();
launch.run();
}
}
<spoiler text="Output:">
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
</spoiler>
Класс, реализующий Runnable, должен содержать метод run(), но ничего особенного в этом методе нет — он не обладает никакими особыми потоковыми возможностями. Чтобы использовать многопоточное выполнение, необходимо явно связать задачу с потоком.
Класс Thread
Традиционный способ преобразования объекта Runnable в задачу заключается в передаче его конструктору Thread. Следующий пример показывает, как организовать выполнение LiftOff с использованием Thread:
//: concurrency/BasicThreads.java
// Простейший вариант использования класса Thread..
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
}
<spoiler text="Output:"> (90% match)
Waiting for LiftOff
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
</spoiler>
Конструктору Thread передается только объект Runnable. Метод start() выполняет необходимую инициализацию потока, после чего вызов метода run() интерфейса Runnable запускает задачу на выполнение в новом потоке.
Из выходных данных видно, что вызов start() быстро возвращает управление (сообщение «Waiting for LiftOff» появляется до завершения отсчета). В сущности, мы вызываем LiftOff.run(), а этот метод еще не завершил свое выполнение; но, поскольку LiftOff.run() выполняется в другом потоке, в потоке main() в это время можно выполнять другие операции. (Данная возможность не ограничивается потоком main() — любой поток может запустить другой поток.) Получается, что программа выполняет два метода сразу — main() и LiftOff.run().
В программе можно легко породить дополнительные потоки для выполнения дополнительных задач:
//: concurrency/MoreBasicThreads.java
// Добавление новых потоков.
public class MoreBasicThreads {
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new Thread(new LiftOff()).start();
System.out.println("Waiting for LiftOff");
}
}
<spoiler text="Output:"> (Sample)
Waiting for LiftOff
#0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7),
#2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5),
#4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2),
#1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!),
#1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!)
</spoiler>
Из выходных данных видно, что задачи выполняются одновременно друг с другом, с поочередной активизацией и выгрузкой потоков. Переключение осуществляется автоматически планировщиком потоков. Если на компьютере установлено несколько процессоров, планировщик потоков автоматически распределяет потоки между разными процессорами.
При разных запусках программы будут получены разные результаты, поскольку работа планировщика потоков недетерминирована. Более того, вы наверняка увидите значительные различия в результатах работы данной программы-примера, запуская ее на различных версиях пакета JDK. К примеру, предыдущие версии JVM не слишком часто выполняли квантование времени, соответственно, поток 1 мог первым закончить свой цикл, затем все свои итерации произвел бы поток 2, и т. д. Фактически то же самое получилось бы, если бы вызывалась процедура, выполняющая все циклы одновременно, за тем исключением, что запуск совокупности потоков требует больших издержек. Более поздние версии JDK обеспечивают более качественное квантование, и каждый поток регулярно получает свою долю внимания. Как правило, Sun не упоминает о подобных изменениях, так что рассчитывать на определенные «правила поведения» потоков не стоит. Лучше всего при написании кода с потоками занять максимально консервативную позицию.
Когда метод main() создает объекты-потоки Thread, он не сохраняет на них ссылки. Обычный объект, «забытый» таким образом, стал бы легкой добычей сборщика мусора, но только не объект-поток Thread. Каждый поток (Thread) самостоятельно «регистрирует» себя, то есть на самом деле ссылка на него где-то существует, и сборщик мусора не вправе удалить его объект.
Исполнители
Исполнители (executors), появившиеся в библиотеке java.util.concurrent в Java SE5, упрощают многозадачное программирование за счет автоматизации управления объектами Thread. Они создают дополнительную логическую прослойку между клиентом и выполнением задачи; задача выполняется не напрямую клиентом, а промежуточным объектом. Исполнители позволяют управлять выполнением асинхронных задач без явного управления жизненным циклом потоков. Именно такой способ запуска задач рекомендуется использовать в Java SE5/6.
Вместо явного создания объектов Thread в MoreBasicThreads.java мы можем воспользоваться исполнителем. Объект LiftOff умеет выполнять определенную операцию и предоставляет единственный метод для выполнения. Объект ExecutorService умеет создавать необходимый контекст для выполнения объектов Runnable. В следующем примере класс CachedThreadPool создает один поток для каждой задачи. Обратите внимание: объект ExecutorService создается статическим методом класса Executors, определяющим разновидность исполнителя:
//: concurrency/CachedThreadPool.java
import java.util.concurrent.*;
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
<spoiler text="Output:"> (Sample)
#0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8),
#0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4),
#1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3),
#2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1),
#2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
</spoiler>
Очень часто для создания и управления всеми задачами в системе достаточно одного исполнителя.
Вызов shutdown() предотвращает передачу Executor новых задач. Текущий поток (в данном случае тот, в котором выполняется main()) продолжает выполняться со всеми задачами, переданными до вызова shutdown(). Работа программы прекращается после завершения всех задач в Executor.
CachedThreadPool в этом примере легко заменяется другим типом Executor. Например, в потоковом пуле фиксированного размера (FixedThreadPool) используется ограниченный набор потоков для выполнения переданных задач:
//: concurrency/FixedThreadPool.java
import java.util.concurrent.*;
public class FixedThreadPool {
public static void main(String[] args) {
// В аргументе конструктора передается количество потоков:
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
<spoiler text="Output:"> (Sample)
#0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8),
#0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4),
#1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3),
#2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1),
#2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
</spoiler>
С FixedThreadPool дорогостоящая операция создания потоков выполняется только один раз, в самом начале, поэтому количество потоков остается фиксированным. Это способствует экономии времени, поскольку вам не приходится нести затраты, связанные с созданием потока, для каждой отдельной задачи. В системах, управляемых событиями, обеспечивается максимальная скорость выполнения обработчиков событий, так как они могут просто получить поток из пула. Перерасход ресурсов в такой схеме исключен, так как FixedThreadPool использует ограниченное количество объектов Thread.
Исполнитель SingleThreadExecutor представляет собой аналог FixedThreadPool с одним потоком. Например, он может быть полезен для выполнения долгосрочных операций (скажем, прослушивания входящих подключений по сокету) и для коротких операций, выполняемых в отдельных потоках, — скажем, обновления локальных или удаленных журналов.
Если SingleThreadExecutor передается более одной задачи, эти задачи ставятся в очередь, и каждая из них отрабатывает до завершения перед началом следующей задачи, причем все они используют один и тот же поток. В следующем примере мы видим, что задачи последовательно завершаются в порядке их поступления. Таким образом, SingleThreadExecutor организует последовательное выполнение поступающих задач и поддерживает свою (внутреннюю) очередь незавершенных задач.
//: concurrency/SingleThreadExecutor.java
import java.util.concurrent.*;
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec =
Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
}
<spoiler text="Output:">
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
#1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!),
#2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!),
#3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!),
#4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),
</spoiler>
Другой пример: допустим, имеется группа потоков, выполняющих операции с использованием файловой системы. Вы можете запустить эти задачи под управлением SingleThreadExecutor, чтобы в любой момент гарантированно выполнялось не более одной задачи. При таком подходе вам не придется возиться с синхронизацией доступа к общим ресурсам (без риска для целостности файловой системы). Возможно, в окончательной версии кода будет правильнее синхронизировать доступ к ресурсу (см. далее в этой главе), но SingleThreadExecutor позволит быстро организовать координацию доступа при построении рабочего прототипа.
Возврат значений из задач
Интерфейс Runnable представляет отдельную задачу, которая выполняет некоторую работу, но не возвращает значения. Если вы хотите, чтобы задача возвращала значение, реализуйте интерфейс Callable вместо интерфейса Runnable. Параметризованный интерфейс Callable, появившийся в Java SE5, имеет параметр типа, представляющий возвращаемое значение метода call() (вместо run()), а для его вызова должен использоваться метод ExecutorService.submit(). Простой пример:
//: concurrency/CallableDemo.java
import java.util.concurrent.*;
import java.util.*;
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() {
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results =
new ArrayList<Future<String>>();
for(int i = 0; i < 10; i++)
results.add(exec.submit(new TaskWithResult(i)));
for(Future<String> fs : results)
try {
// Вызов get() блокируется до завершения;:
System.out.println(fs.get());
} catch(InterruptedException e) {
System.out.println(e);
return;
} catch(ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}
<spoiler text="Output:">
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
</spoiler>
Метод submit() создает объект Future, параметризованный по типу результата, возвращаемому Callable. Вы можете обратиться к Future с запросом isDone(), чтобы узнать, завершена ли операция. После завершения задачи и появления результата производится его выборка методом get(). Если get() вызывается без предварительной проверки isDone(), вызов блокируется до появления результата. Также можно вызвать get() с интервалом тайм-аута.
Перегруженный метод Executors.callable() получает Runnable и выдает Callable. ExecutorService содержит методы для выполнения коллекций объектов Callable.
Ожидание
Другим способом управления вашими потоками является вызов метода sleep(), который переводит поток в состояние ожидания на заданное количество миллисекунд. Если в классе LiftOff заменить вызов yield() на вызов метода sleep(), будет получен следующий результат:
//: concurrency/SleepingTask.java
// Вызов sleep() для приостановки потока.
import java.util.concurrent.*;
public class SleepingTask extends LiftOff {
public void run() {
try {
while(countDown-- > 0) {
System.out.print(status());
// Старый стиль.
// Thread.sleep(l00);
// Стиль Java SE5/6:
TimeUnit.MILLISECONDS.sleep(100);
}
} catch(InterruptedException e) {
System.err.println("Interrupted");
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new SleepingTask());
exec.shutdown();
}
}
<spoiler text="Output:">
#0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7),
#1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5),
#2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3),
#3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1),
#4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
</spoiler>
Вызов метода sleep() может привести к исключению InterruptedException; перехват этого исключения продемонстрирован в run(). Поскольку исключения не распространяются по потокам обратно в main(), вы должны локально обработать любые исключения, возникающие внутри задачи.
В Java SE5 появилась новая версия sleep(), оформленная в виде метода класса TimeUnit; она продемонстрирована в приведенном примере. Она делает программу более наглядной, поскольку вы можете указать единицы измерения продолжительности задержки. Класс TimeUnit также может использоваться для выполнения преобразований, как будет показано далее в этой главе.
На некоторых платформах задачи выполняются в порядке «идеального распределения» — от 0 до 4, затем снова от 4 до 0. Это вполне логично, поскольку после каждой команды вывода задача переходит в состояние ожидания, что позволяет планировщику потоков переключиться на другой поток. Тем не менее такое поведение зависит от базовой реализации потокового механизма, поэтому полагаться на него нельзя. Если вам потребуется управлять порядком выполнения задач, используйте средства синхронизации (см. далее) или же вообще откажитесь от использования потоков и напишите собственные функции синхронизации, которые передают управление друг другу в нужном порядке.
Приоритет
Приоритет (priority) потока сообщает планировщику информацию об относительной важности потока. Хотя порядок обращения процессора к существующему набору потоков и не детерминирован, если существует несколько приостановленных потоков, одновременно ожидающих запуска, планировщик сначала запустит поток с большим приоритетом. Впрочем, это не значит, что потоки с младшими приоритетами не выполняются вовсе (то есть тупиковых ситуаций из-за приоритетов не возникает). Потоки с более низкими приоритетами просто запускаются чуть реже.
В подавляющем большинстве случаев все потоки должны выполняться со стандартным приоритетом. Любые попытки манипуляций с приоритетами обычно являются ошибкой.
Следующий пример демонстрирует использование приоритетов. Приоритет существующего потока читается методом getPriority() и задается методом setPriority():
//: concurrency/SimplePriorities.java
// Использование приоритетов потоков.
import java.util.concurrent.*;
public class SimplePriorities implements Runnable {
private int countDown = 5;
private volatile double d; // Без оптимизации
private int priority;
public SimplePriorities(int priority) {
this.priority = priority;
}
public String toString() {
return Thread.currentThread() + ": " + countDown;
}
public void run() {
Thread.currentThread().setPriority(priority);
while(true) {
// Высокозатратная, прерываемая операция:
for(int i = 1; i < 100000; i++) {
d += (Math.PI + Math.E) / (double)i;
if(i % 1000 == 0)
Thread.yield();
}
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(
new SimplePriorities(Thread.MIN_PRIORITY));
exec.execute(
new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}
<spoiler text="Output:"> (70% match)
Thread[pool-1-thread-6,10,main]: 5
Thread[pool-1-thread-6,10,main]: 4
Thread[pool-1-thread-6,10,main]: 3
Thread[pool-1-thread-6,10,main]: 2
Thread[pool-1-thread-6,10,main]: 1
Thread[pool-1-thread-3,1,main]: 5
Thread[pool-1-thread-2,1,main]: 5
Thread[pool-1-thread-1,1,main]: 5
Thread[pool-1-thread-5,1,main]: 5
Thread[pool-1-thread-4,1,main]: 5
</spoiler>
В этой версии метод toString() переопределяется и использует метод Thread. toString(), который выводит имя потока (его можно задать в конструкторе, но здесь имена автоматически генерируются в виде pool-1-thread-1, pool-1-thread-2 и т. д.), приоритет и группу, к которой принадлежит поток. Переопределенная версия toString() также выводит обратный отсчет, выполняемый задачей. Обратите внимание: для получения ссылки на объект Thread, управляющий задачей, внутри самой задачи, следует вызвать метод Thread.currentThread().
Мы видим, что приоритет последнего потока имеет наивысший уровень, а все остальные потоки находятся на низшем уровне. Учтите, что приоритет задается в начале выполнения run(); задавать его в конструкторе бессмысленно, потому что Executor в этот момент еще не начал выполнять задачу.
В метод run() были добавлены 100 000 достаточно затратных операций с плавающей запятой, включая суммирование и деление с числом двойной точности double. Переменная d была отмечена как volatile, чтобы компилятор не применял оптимизацию. Без этих вычислений вы не увидите эффекта установки различных приоритетов (попробуйте закомментировать цикл for с вычислениями). В процессе вычислений мы видим, что планировщик уделяет больше внимания потоку с приоритетом MAX_PRIORITY (по крайней мере, таково было поведение программы на машине под управлением Windows ХР). Несмотря даже на то, что вывод на консоль также является «дорогостоящей» операцией, с ним вы не увидите влияние уровней приоритетов, поскольку вывод на консоль не прерывается (иначе экран был бы заполнен несуразицей), в то время как математические вычисления, приведенные выше, прерывать допустимо. Вычисления выполняются достаточно долго, соответственно, механизм планирования потоков вмешивается в процесс и чередует потоки, проявляя при этом внимание к более приоритетным. Тем не менее для обеспечения переключения контекста в программе периодически выполняются команды yield().
В пакете JDK предусмотрено 10 уровней приоритетов, однако это не слишком хорошо согласуется с большинством операционных систем. К примеру, в Windows имеется 7 классов приоритетов, таким образом, их соотношение неочевидно (хотя в операционной системе Sun Solaris имеется 231 уровней). Переносимость обеспечивается только использованием универсальных констант МАХ_РRIORITY, NORM_PRIORITY и MIN_PRI0RITY.
Передача управления
Если вы знаете, что в текущей итерации run() сделано все необходимое, вы можете подсказать механизму планирования потоков, что процессором теперь может воспользоваться другой поток. Эта подсказка (не более чем рекомендация; нет никакой гарантии, что планировщик потоков «прислушается» к ней) воплощается в форме вызова метода yield(). Вызывая yield(), вы сообщаете системе, что в ней могут выполняться другие потоки того же приоритета.
В примере LiftOff метод yield() обеспечивает равномерное распределение вычислительных ресурсов между задачами LiftOff. Попробуйте закомментировать вызов Thread.yield() в Lift0ff.run() и проследите за различиями. И все же, в общем случае не стоит полагаться на yield() как на серьезное средство настройки вашего приложения.
Потоки-демоны
Демоном называется поток, предоставляющий некоторый сервис, работая в фоновом режиме во время выполнения программы, но при этом не является ее неотъемлемой частью. Таким образом, когда все потоки не-демоны заканчивают свою деятельность, программа завершается. И наоборот, если существуют работающие потоки не-демоны, программа продолжает выполнение. Существует, например, поток не-демон, выполняющий метод main().
//: concurrency/SimpleDaemons.java
// Потоки-демоны не препятствуют завершению работы программы.
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
public class SimpleDaemons implements Runnable {
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
print(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
print("sleep() interrupted");
}
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < 10; i++) {
Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true); // Необходимо вызвать перед start()
daemon.start();
}
print("All daemons started");
TimeUnit.MILLISECONDS.sleep(175);
}
}
<spoiler text="Output:"> (Sample)
All daemons started
Thread[Thread-0,5,main] SimpleDaemons@530daa
Thread[Thread-1,5,main] SimpleDaemons@a62fc3
Thread[Thread-2,5,main] SimpleDaemons@89ae9e
Thread[Thread-3,5,main] SimpleDaemons@1270b73
Thread[Thread-4,5,main] SimpleDaemons@60aeb0
Thread[Thread-5,5,main] SimpleDaemons@16caf43
Thread[Thread-6,5,main] SimpleDaemons@66848c
Thread[Thread-7,5,main] SimpleDaemons@8813f2
Thread[Thread-8,5,main] SimpleDaemons@1d58aae
Thread[Thread-9,5,main] SimpleDaemons@83cc67
...
</spoiler>
Чтобы назначить поток демоном, следует перед его запуском вызвать метод setDaemon().
После того как main() завершит свою работу, ничто не препятствует завершению программы, поскольку в процессе не работают другие потоки, кроме демонов. Чтобы результаты запуска всех потоков-демонов были более наглядными, поток main() на некоторое время погружается в «сон». Без этого вы увидели бы только часть результатов при создании демонов. (Поэкспериментируйте с вызовом sleep() для интервалов разной продолжительности.)
В примере SimpleDaemons.java используется явное создание объектов Thread для установки их «демонского» флага. Вы также можете настроить атрибуты (демон, приоритет, имя) потоков, созданных исполнителем; для этого следует написать пользовательскую реализацию ThreadFactory:
//: net/mindview/util/DaemonThreadFactory.java
package net.mindview.util;
import java.util.concurrent.*;
public class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
Единственное отличие от обычной реализации ThreadFactory заключается в том, что в данном случае атрибут демона задается равным true. Теперь новый объект DaemonThreadFactory передается в аргументе Executors.newCachedThreadPool():
//: concurrency/DaemonFromFactory.java
// Использование ThreadFactory для создания демонов.
import java.util.concurrent.*;
import net.mindview.util.*;
import static net.mindview.util.Print.*;
public class DaemonFromFactory implements Runnable {
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
print(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
print("Interrupted");
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool(
new DaemonThreadFactory());
for(int i = 0; i < 10; i++)
exec.execute(new DaemonFromFactory());
print("All daemons started");
TimeUnit.MILLISECONDS.sleep(500); // Задержка
}
} /* (Execute to see output) *///:~
Каждый статический метод создания ExecutorService перегружается для получения объекта ThreadFactory, который будет использоваться для создания новых потоков.
Сделаем еще один шаг — создадим вспомогательный класс DaemonThreadPoolExecutor:
//: net/mindview/util/DaemonThreadPoolExecutor.java
package net.mindview.util;
import java.util.concurrent.*;
public class DaemonThreadPoolExecutor
extends ThreadPoolExecutor {
public DaemonThreadPoolExecutor() {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory());
}
}
Чтобы узнать, какие значения должны передаваться при вызове конструктора базового класса, я просто заглянул в исходный код Executors.java.
Чтобы узнать, является ли поток демоном, вызовите метод isDaemon(). Если поток является демоном, то все потоки, которые он производит, также будут демонами, что и демонстрируется следующим примером:
//: concurrency/Daemons.java
// Потоки, порождаемые демонами, также являются демонами
// Daemon threads spawn other daemon threads.
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
class Daemon implements Runnable {
private Thread[] t = new Thread[10];
public void run() {
for(int i = 0; i < t.length; i++) {
t[i] = new Thread(new DaemonSpawn());
t[i].start();
printnb("DaemonSpawn " + i + " started, ");
}
for(int i = 0; i < t.length; i++)
printnb("t[" + i + "].isDaemon() = " +
t[i].isDaemon() + ", ");
while(true)
Thread.yield();
}
}
class DaemonSpawn implements Runnable {
public void run() {
while(true)
Thread.yield();
}
}
public class Daemons {
public static void main(String[] args) throws Exception {
Thread d = new Thread(new Daemon());
d.setDaemon(true);
d.start();
printnb("d.isDaemon() = " + d.isDaemon() + ", ");
// Даем потокам-демонам завершить процесс запуска:
TimeUnit.SECONDS.sleep(1);
}
}
<spoiler text="Output:"> (Sample)
d.isDaemon() = true, DaemonSpawn 0 started, DaemonSpawn 1 started, DaemonSpawn 2 started,
DaemonSpawn 3 started, DaemonSpawn 4 started, DaemonSpawn 5 started, DaemonSpawn 6 started,
DaemonSpawn 7 started, DaemonSpawn 8 started, DaemonSpawn 9 started, t[0].isDaemon() = true,
t[1].isDaemon() = true, t[2].isDaemon() = true, t[3].isDaemon() = true, t[4].isDaemon() = true,
t[5].isDaemon() = true, t[6].isDaemon() = true, t[7].isDaemon() = true, t[8].isDaemon() = true,
t[9].isDaemon() = true,
</spoiler>
Поток Daemon переводится в режим демона, а затем порождает группу новых потоков, которые явно не назначаются демонами, но при этом все равно оказываются ими. Затем Daemon входит в бесконечный цикл, на каждом шаге которого вызывается метод yield(), передающий управление другими процессам.
Учтите, что потоки-демоны завершают свои методы run() без выполнения секций finally:
//: concurrency/DaemonsDontRunFinally.java
// Потоки-демоны не выполняют секцию finally.
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
class ADaemon implements Runnable {
public void run() {
try {
print("Starting ADaemon");
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException e) {
print("Exiting via InterruptedException");
} finally {
print("This should always run?");
}
}
}
public class DaemonsDontRunFinally {
public static void main(String[] args) throws Exception {
Thread t = new Thread(new ADaemon());
t.setDaemon(true);
t.start();
}
}
<spoiler text="Output:">
Starting ADaemon
</spoiler>
Запуск программы наглядно показывает, что секция finally не выполняется. С другой стороны, если закомментировать вызов setDaemon(), вы увидите, что секция finally была выполнена.
Такое поведение верно, даже если из предыдущих описаний finally у вас сложилось обратное впечатление. Демоны завершаются «внезапно», при завершении последнего не-демона. Таким образом, сразу же при выходе из main() JVM немедленно прерывает работу всех демонов, не соблюдая никакие формальности. Невозможность корректного завершения демонов ограничивает возможности их применения. Обычно объекты Executor оказываются более удачным решением, потому что все задачи, находящиеся под управлением Executor, могут быть завершены одновременно.
Варианты кодирования
Во всех предшествующих примерах все классы задач реализовали интерфейс Runnable. В очень простых случаях можно использовать альтернативное решение с прямым наследованием от Thread:
//: concurrency/SimpleThread.java
// Прямое наследование от класса Thread..
public class SimpleThread extends Thread {
private int countDown = 5;
private static int threadCount = 0;
public SimpleThread() {
// Store the thread name:
super(Integer.toString(++threadCount));
start();
}
public String toString() {
return "#" + getName() + "(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.print(this);
if(--countDown == 0)
return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SimpleThread();
}
}
<spoiler text="Output:">
#1(5), #1(4), #1(3), #1(2), #1(1), #2(5), #2(4), #2(3), #2(2), #2(1), #3(5),
#3(4), #3(3), #3(2), #3(1), #4(5), #4(4), #4(3), #4(2), #4(1), #5(5), #5(4),
#5(3), #5(2), #5(1),
</spoiler>
Чтобы задать объектам Thread имена, вы вызываете соответствующий конструктор Thread. Имя читается в методе toString() при помощи getName().
Также иногда встречается идиома самоуправляемой реализации Runnable:
//: concurrency/SelfManaged.java
// Реализация Runnable. содержащая собственный объект Thread.
public class SelfManaged implements Runnable {
private int countDown = 5;
private Thread t = new Thread(this);
public SelfManaged() { t.start(); }
public String toString() {
return Thread.currentThread().getName() +
"(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.print(this);
if(--countDown == 0)
return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SelfManaged();
}
}
<spoiler text="Output:">
Thread-0(5), Thread-0(4), Thread-0(3), Thread-0(2), Thread-0(1), Thread-1(5), Thread-1(4),
Thread-1(3), Thread-1(2), Thread-1(1), Thread-2(5), Thread-2(4), Thread-2(3), Thread-2(2),
Thread-2(1), Thread-3(5), Thread-3(4), Thread-3(3), Thread-3(2), Thread-3(1), Thread-4(5),
Thread-4(4), Thread-4(3), Thread-4(2), Thread-4(1),
</spoiler>
В целом происходящее не так уж сильно отличается от наследования от Thread, разве что синтаксис получается чуть более громоздким. Однако реализация интерфейса позволяет наследовать от другого класса, тогда как в варианте с Thread это невозможно.
Обратите внимание на вызов start() в конструкторе. Приведенный пример очень прост, поэтому, скорее всего, в нем такое решение безопасно, но вы должны знать, что запуск потоков в конструкторе может создать изрядные проблемы — до завершения конструктора может быть запущена на выполнение другая задача, которая обратится к объекту в нестабильном состоянии. Это еще одна причина, по которой использование Executor предпочтительнее явного создания объектов Thread.
Иногда бывает разумно спрятать потоковый код внутри класса с помощью внутреннего класса, как показано здесь:
//: concurrency/ThreadVariations.java
// Создание потоков с использованием внутренних классов..
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
// Using a named inner class:
class InnerThread1 {
private int countDown = 5;
private Inner inner;
private class Inner extends Thread {
Inner(String name) {
super(name);
start();
}
public void run() {
try {
while(true) {
print(this);
if(--countDown == 0) return;
sleep(10);
}
} catch(InterruptedException e) {
print("interrupted");
}
}
public String toString() {
return getName() + ": " + countDown;
}
}
public InnerThread1(String name) {
inner = new Inner(name);
}
}
// Используем безымянный внутренний класс::
class InnerThread2 {
private int countDown = 5;
private Thread t;
public InnerThread2(String name) {
t = new Thread(name) {
public void run() {
try {
while(true) {
print(this);
if(--countDown == 0) return;
sleep(10);
}
} catch(InterruptedException e) {
print("sleep() interrupted");
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
// Используем именованную реализацию Runnable:
class InnerRunnable1 {
private int countDown = 5;
private Inner inner;
private class Inner implements Runnable {
Thread t;
Inner(String name) {
t = new Thread(this, name);
t.start();
}
public void run() {
try {
while(true) {
print(this);
if(--countDown == 0) return;
TimeUnit.MILLISECONDS.sleep(10);
}
} catch(InterruptedException e) {
print("sleep() interrupted");
}
}
public String toString() {
return t.getName() + ": " + countDown;
}
}
public InnerRunnable1(String name) {
inner = new Inner(name);
}
}
// Используем анонимную реализацию Runnable:
class InnerRunnable2 {
private int countDown = 5;
private Thread t;
public InnerRunnable2(String name) {
t = new Thread(new Runnable() {
public void run() {
try {
while(true) {
print(this);
if(--countDown == 0) return;
TimeUnit.MILLISECONDS.sleep(10);
}
} catch(InterruptedException e) {
print("sleep() interrupted");
}
}
public String toString() {
return Thread.currentThread().getName() +
": " + countDown;
}
}, name);
t.start();
}
}
// Отдельный метод для выполнения кода в потоке:
class ThreadMethod {
private int countDown = 5;
private Thread t;
private String name;
public ThreadMethod(String name) { this.name = name; }
public void runTask() {
if(t == null) {
t = new Thread(name) {
public void run() {
try {
while(true) {
print(this);
if(--countDown == 0) return;
sleep(10);
}
} catch(InterruptedException e) {
print("sleep() interrupted");
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
}
public class ThreadVariations {
public static void main(String[] args) {
new InnerThread1("InnerThread1");
new InnerThread2("InnerThread2");
new InnerRunnable1("InnerRunnable1");
new InnerRunnable2("InnerRunnable2");
new ThreadMethod("ThreadMethod").runTask();
}
} /* (Execute to see output) *///:~
InnerThread1 определяет именованный внутренний класс, производный от Thread, и создает экземпляр этого класса в конструкторе. Поступать так стоит в том случае, когда у внутреннего класса есть особые возможности (новые методы), которые могут понадобиться в других методах. Однако в большинстве случаев причина создания потока — использование функциональности класса Thread, поэтому в именованном внутреннем классе особой нужды нет. InnerThread2 показывает другое решение. В конструкторе создается безымянный внутренний субкласс Thread, преобразуемый восходящим преобразованием к ссылке на Thread.t. Если другим методам класса понадобится обратиться к t, они смогут сделать это через интерфейс Thread, и им не нужно будет знать точный тип объекта.
Третий и четвертый классы примера повторяют первые два, только вместо класса Thread они используют интерфейс Runnable.
Класс ThreadMethod демонстрирует создание потока в методе. Вы вызываете метод, когда программа готова к запуску потока, а метод возвращает управление после запуска потока. Если поток выполняет только вспомогательные операции и не является фундаментальной частью класса, то этот способ, вероятно, удобнее и практичнее запуска потока из конструктора класса.
Присоединение к потоку
Любой поток может вызвать метод join(), чтобы дождаться завершения другого потока перед своим продолжением. Если поток вызывает t.join() для другого потока t, то вызывающий поток приостанавливается до тех пор, пока целевой поток t не завершит свою работу (когда метод t.isAlive() вернет значение false).
Вызвать метод join() можно также и с аргументом, указывающим продолжительность ожидания (в миллисекундах или в миллисекундах с наносекундами). Если целевой поток не закончит работу за означенный период времени, метод join() все равно вернет управление инициатору.
Вызов join() может быть прерван вызовом метода interrupt() для потока-инициатора, поэтому потребуется блок try-catch.
Все эти операции продемонстрированы в следующем примере:
//: concurrency/Joining.java
// Демонстрация join().
import static net.mindview.util.Print.*;
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}
public void run() {
try {
sleep(duration);
} catch(InterruptedException e) {
print(getName() + " was interrupted. " +
"isInterrupted(): " + isInterrupted());
return;
}
print(getName() + " has awakened");
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
sleeper.join();
} catch(InterruptedException e) {
print("Interrupted");
}
print(getName() + " join completed");
}
}
public class Joining {
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("Grumpy", 1500);
Joiner
dopey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
}
<spoiler text="Output:">
Grumpy was interrupted. isInterrupted(): false
Doc join completed
Sleepy has awakened
Dopey join completed
</spoiler>
Класс Sleeper — это тип потока, который приостанавливается на время, указанное в его конструкторе. В методе run() вызов метода sleep() может закончиться по истечении времени задержки, но может и прерваться. В секции catch выводится сообщение о прерывании, вместе со значением, возвращаемым методом isInterrupted(). Когда другой поток вызывает interrupt() для данного потока, устанавливается флаг, показывающий, что поток был прерван. Однако этот флаг сбрасывается при обработке исключения, поэтому внутри секции catch результатом всегда будет false. Флаг используется в других ситуациях, где поток может исследовать свое прерванное состояние в стороне от исключения.
Joiner — поток, который ожидает пробуждения потока Sleeper, вызывая для последнего метод join(). В методе main() каждому объекту Joiner сопоставляется Sleeper, и вы можете видеть в результатах работы программы, что, если Sleeper был прерван или завершился нормально, Joiner прекращает работу вместе с потоком Sleeper.
Совместное использование ресурсов
Однопоточную программу можно представить в виде одинокого работника, передвигающегося по своему пространству задачи и выполняющего по одной операции за один раз. Раз работник один, вам не приходится принимать во внимание проблему двух сущностей, пытающихся оспорить право на один и тот же ресурс, подобно двум людям, которые хотят одновременно поставить машину в одном месте, вдвоем пройти в одну дверь или даже говорить одновременно.
В условиях многозадачности ситуация меняется: у вас есть сразу два или три потока, которые стремятся получить доступ к одному и тому же ограниченному ресурсу. Если не предотвратить подобные конфликты, два потока могут попытаться получить доступ к одному счету в банке, одновременно распечатать два документа на одном принтере, изменить одно и то же значение, и т. п.
Некорректный доступ к ресурсам
Рассмотрим следующий пример, в котором одна задача генерирует четные числа, а другие задачи эти числа потребляют. Единственной задачей задач-потребителей является проверка четности этих чисел.
Начнем с определения EvenChecker, задачи-потребителя, поскольку она будет использоваться во всех последующих примерах. Чтобы отделить EvenChecker от различных генераторов, с которыми мы будем экспериментировать, мы определим абстрактный класс IntGenerator, содержащий минимум необходимых методов для EvenChecker: метод для получения следующего значения next() и методы отмены. Класс не реализует интерфейс Generator, потому что он должен выдавать int, а параметризация не поддерживает примитивные параметры.
//: concurrency/IntGenerator.java
public abstract class IntGenerator {
private volatile boolean canceled = false;
public abstract int next();
// Allow this to be canceled:
public void cancel() { canceled = true; }
public boolean isCanceled() { return canceled; }
}
IntGenerator содержит метод cancel(), изменяющий состояние флага canceled, и метод isCanceled(), проверяющий, был ли объект отменен. Поскольку флаг canceled относится к типу boolean, простые операции вроде присваивания и возврата значения выполняются атомарно, то есть без возможности прерывания, и вы не увидите поле в промежуточном состоянии между этими простыми операциями. Смысл ключевого слова volatile будет объяснен позже в этой главе.
Для тестирования IntGenerator можно воспользоваться следующим классом EvenChecker:
//: concurrency/EvenChecker.java
import java.util.concurrent.*;
public class EvenChecker implements Runnable {
private IntGenerator generator;
private final int id;
public EvenChecker(IntGenerator g, int ident) {
generator = g;
id = ident;
}
public void run() {
while(!generator.isCanceled()) {
int val = generator.next();
if(val % 2 != 0) {
System.out.println(val + " not even!");
generator.cancel(); // Отмена всех EvenChecker
}
}
}
// Тестирование произвольного типа IntGenerator:
public static void test(IntGenerator gp, int count) {
System.out.println("Press Control-C to exit");
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < count; i++)
exec.execute(new EvenChecker(gp, i));
exec.shutdown();
}
// Значение по умолчанию для count:
public static void test(IntGenerator gp) {
test(gp, 10);
}
}
Как видно из run(), все задачи EvenChecker, зависящие от объекта IntGenerator, проверяют, не были ли они отменены. При таком подходе задачи, совместно использующие общий ресурс (IntGenerator), наблюдают за ресурсом, ожидая от него сигнала завершения. Тем самым устраняется так называемая «ситуация гонки», когда две и более задачи торопятся отреагировать на некоторое условие; это приводит к возникновению конфликтов или получению других некорректных результатов. Будьте внимательны, постарайтесь продумать все возможные сбои в системах с параллельным выполнением и защититься от них. Например, задача не может зависеть от другой задачи, потому что порядок завершения задач не гарантирован. Зависимость задач от объекта, не являющегося задачей, устраняет потенциальную «ситуацию гонки».
Метод test() настраивает и тестирует произвольный тип IntGenerator, запуская несколько задач EvenChecker с общим IntGenerator. Если IntGenerator приводит к сбою, test() сообщает о происходящем и возвращает управление. В противном случае его следует завершить вручную клавишами Ctrl+C.
Задачи EvenChecker постоянно читают и проверяют значения, полученные от IntGenerator. Если generator.isCanceled() равен true, run() возвращает управление; это сообщает Executor в EvenChecker.test() о том, что задача завершена. Любая задача EvenChecker может вызвать cancel() для связанного с ней IntGenerator, в результате чего все остальные EvenChecker, использующие IntGenerator, будут корректно завершены. Как будет показано далее в этой главе, в Java существуют и более общие механизмы завершения потоков.
В первом варианте IntGenerator, который мы рассмотрим, next() выдает серию четных значений:
//: concurrency/EvenGenerator.java
// Конфликт потоков.
public class EvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
public int next() {
++currentEvenValue; // Опасная точка!
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new EvenGenerator());
}
}
<spoiler text="Output:">
Нажмите Control-С, чтобы завершить программу
89476993 not even!
89476993 not even!
</spoiler>
Одна задача может вызвать next() после того, как другая задача выполнит первый инкремент currentEvenValue, но до второго инкремента (в позиции, помеченной комментарием «Опасная точка!»). При этом значение оказывается в «некорректном» состоянии. Чтобы доказать, что такое возможно, EvenChecker. test() создает группу объектов EvenChecker, которые непрерывно читают результаты EvenGenerator и проверяют их на четность. При обнаружении нечетного числа выводится сообщение об ошибке, и программа завершается.
Сбой рано или поздно произойдет, потому что задачи EvenChecker могут обратиться к информации EvenGenerator в «некорректном» состоянии. Впрочем, проблема может проявиться лишь после многих циклов отработки EvenGenerator; все зависит от особенностей операционной системы и других подробностей реализации. Чтобы ускорить наступление сбоя, попробуйте разместить вызов yield() между инкрементами. В этом и состоит одна из проблем многопоточного программирования: программа, содержащая ошибку, на первый взгляд работает вполне нормально — а все потому, что вероятность сбоя очень мала.
Также стоит учитывать, что сама операция инкремента состоит из нескольких шагов и может быть прервана планировщиком потоков в ходе выполнения — другими словами, инкремент в Java не является атомарной операцией. Даже простое приращение переменной может оказаться небезопасным, если не организовать защиту задачи.
Разрешение конфликтов доступа
Предыдущий пример показательно иллюстрирует основную проблему потоков: вы никогда не знаете, когда поток будет выполняться. Вообразите, что вы сидите за столом с вилкой в руках, собираетесь съесть последний, самый лакомый кусочек, который лежит на тарелке прямо перед вами. Но, как только вы тянетесь к еде вилкой, она исчезает (как ваш поток был внезапно приостановлен, и другой поток не постеснялся «стянуть» у вас еду). Вот такую проблему нам приходится решать при написании выполняемых одновременно и использующих общие ресурсы программ. Чтобы многопоточность работала, необходим механизм, предотвращающий возможность состязания двух потоков за один ресурс (по крайней мере, во время критичных операций).
Предотвратить такое столкновение интересов несложно — надо блокировать ресурс для других потоков, пока он находится в ведении одного потока. Первый поток, получивший доступ к ресурсу, вешает на него «замок», и тогда все остальные потоки не смогут получить этот ресурс до тех пор, пока «замок» не будет снят, и только после этого другой поток овладеет ресурсом и заблокирует его, и т. д. Если переднее сиденье машины является для детей ограниченным ресурсом, то ребенок, первым крикнувший «Чур, я спереди!», отстоял свое право на «блокировку».
Для решения проблемы соперничества потоков фактически все многопоточные схемы синхронизируют доступ к разделяемым ресурсам. Это означает, что доступ к разделяемому ресурсу в один момент времени может получить только один поток. Чаще всего это выполняется помещением фрагмента кода в секцию блокировки так, что одновременно пройти по этому фрагменту кода может только один поток. Поскольку такое предложение блокировки дает эффект взаимного исключения, этот механизм часто называют мьютексом (MUTual Exclusion).
Вспомните свою ванную комнату — несколько людей (потоки) могут захотеть эксклюзивно владеть ей (разделяемым ресурсом). Чтобы получить доступ в ванную, человек стучится в дверь, желая проверить, не занята ли она. Если ванная свободна, он входит в нее и запирает дверь. Любой другой поток, желающий оказаться внутри, «блокируется» в этом действии, и ему приходится ждать у двери, пока ванная не освободится.
Аналогия немного нарушается, когда дверь в ванную комнату снова открывается, и приходит время передать доступ другому потоку. Как люди на самом деле не становятся в очередь, так и здесь мы точно не знаем, кто «зайдет в ванную» следующим, потому что поведение планировщика потоков недетерминировано. Существует гипотетическая группа блокированных потоков, толкущихся у двери, и, когда поток, который занимал «ванную», разблокирует ее и уйдет, тот поток, что окажется ближе всех к двери, «войдет» в нее. Как уже было замечено, планировщику можно давать подсказки методами yield() и setPriority(), но эти подсказки необязательно будут иметь эффект, в зависимости от вашей платформы и реализации виртуальной машины JVM.
В Java есть встроенная поддержка для предотвращения конфликтов в виде ключевого слова synchronized. Когда поток желает выполнить фрагмент кода, охраняемый словом synchronized, он проверяет, доступен ли семафор, получает доступ к семафору, выполняет код и освобождает семафор.
Разделяемый ресурс чаще всего является блоком памяти, представляющим объект, но это также может быть файл, порт ввода/вывода или устройство (скажем, принтер). Для управления доступом к разделяемому ресурсу вы сначала помещаете его внутрь объекта. После этого любой метод, получающий доступ к ресурсу, может быть объявлен как synchronized. Это означает, что, если задача выполняется внутри одного из объявленных как synchronized методов, все остальные потоки не сумеют зайти ни в какой synchronized-метод до тех пор, пока первый поток не вернется из своего вызова.
Как известно, в окончательной версии кода поля класса обычно объявляются закрытыми (private), а доступ к их памяти осуществляется только посредством методов. Чтобы предотвратить конфликты, объявите такие методы синхронизированными (с помощью ключевого слова synchronized):
synchronized void f() { /* .. */ }
synchronized void g(){ /*.. */ }
Каждый объект содержит объект простой блокировки (также называемый монитором). При вызове любого синхронизированного (synchronized) метода объект переходит в состояние блокировки, и пока этот метод не закончит свою работу и не снимет блокировку, другие синхронизированные методы для объекта не могут быть вызваны. В только что рассмотренном примере, если для объекта вызывается метод f(), метод g() не будет вызван до тех пор, пока метод f() не завершит свою работу и не сбросит блокировку. Таким образом, монитор совместно используется всеми синхронизированными методами определенного объекта и предотвращает использование общей памяти несколькими потоками одновременно.
Один поток может блокировать объект многократно. Это происходит, когда метод вызывает другой метод того же объекта, который, в свою очередь, вызывает еще один метод того же объекта, и т. д. Виртуальная машина JVM следит за тем, сколько раз объект был заблокирован. Если объект не блокировался, его счетчик равен нулю. Когда задача захватывает объект в первый раз, счетчик увеличивается до единицы. Каждый раз, когда задача снова овладевает объектом блокировки того же объекта, счетчик увеличивается. Естественно, что все это разрешается только той задаче, которая инициировала первичную блокировку. При выходе задачи из синхронизированного метода счетчик уменьшается на единицу до тех пор, пока не делается равным нулю, после чего объект блокировки данного объекта становится доступен другим потокам.
Также существует отдельный монитор для класса (часть объекта Class), который следит за тем, чтобы статические (static) синхронизированные (synchronized) методы не использовали одновременно общие статические данные класса.
Синхронизация для примера EvenGenerator
Включив в программу EvenGenerator.java поддержку synchronized, мы можем предотвратить нежелательный доступ со стороны потоков:
//: concurrency/SynchronizedEvenGenerator.java
// Упрощение работы с мьютексами с использованием
// ключевого слова synchronized.
// {RunByHand}
public class
SynchronizedEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
public synchronized int next() {
++currentEvenValue;
Thread.yield(); // Ускоряем сбой
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new SynchronizedEvenGenerator());
}
}
Вызов Thread.yield() между двумя инкрементами повышает вероятность переключения контекста при нахождении currentEvenValue в нечетном состоянии. Так как мьютекс позволяет выполнять критическую секцию не более чем одной задаче, сбоев не будет.
Первая задача, входящая в next(), устанавливает блокировку, а все остальные задачи, пытающиеся ее установить, блокируются до момента снятия блокировки первой задачей. В этой точке механизм планирования выбирает другую задачу, ожидающую блокировки. Таким образом, в любой момент времени только одна задача может проходить по коду, защищенному мьютексом.
Объекты Lock
Библиотека Java SE5 java.utiLconcurrent также содержит явный механизм управления мьютексами, определенный в java.util.concurrent.locks. Объект Lock можно явно создать в программе, установить или снять блокировку; правда, полученный код будет менее элегантным, чем при использовании встроенной формы. С другой стороны, он обладает большей гибкостью при решении некоторых типов задач. Вот как выглядит пример SynchronizedEvenGenerator.java с явным использованием объектов Lock:
//: concurrency/MutexEvenGenerator.java
// Предотвращение потоковых конфликтов с использованием мьютексов.
// {RunByHand}
import java.util.concurrent.locks.*;
public class MutexEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield(); // Ускоряем сбой
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
EvenChecker.test(new MutexEvenGenerator());
}
}
MutexEvenGenerator добавляет мьютекс с именем lock и использует методы lock() и unlock() для создания критической секции в next(). При использовании объектов Lock следует применять идиому, показанную в примере: сразу же за вызовом lock() необходимо разместить конструкцию try-finally, при этом в секцию finally включается вызов unlock() — только так можно гарантировать снятие блокировки.
Хотя try-finally требует большего объема кода, чем ключевое слово synchronized, явное использование объектов Lock обладает своими преимуществами. При возникновении проблем с ключевым словом synchronized происходит исключение, но вы не получите возможность выполнить завершающие действия, чтобы сохранить корректное состояние системы. При работе с объектами Lock можно сделать все необходимое в секции finally.
В общем случае использование synchronized уменьшает объем кода, а также радикально снижает вероятность ошибки со стороны программиста, поэтому явные операции с объектами Lock обычно выполняются только при решении особых задач. Например, с ключевым словом synchronized нельзя попытаться получить блокировку с неудачным исходом или попытаться получить блокировку в течение некоторого промежутка времени с последующим отказом — в подобных случаях приходится использовать библиотеку concurrent:
//: concurrency/AttemptLocking.java
// Объекты Lock из библиотеки concurrent делают возможными
// попытки установить блокировку в течение некоторого времени
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class AttemptLocking {
private ReentrantLock lock = new ReentrantLock();
public void untimed() {
boolean captured = lock.tryLock();
try {
System.out.println("tryLock(): " + captured);
} finally {
if(captured)
lock.unlock();
}
}
public void timed() {
boolean captured = false;
try {
captured = lock.tryLock(2, TimeUnit.SECONDS);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println("tryLock(2, TimeUnit.SECONDS): " +
captured);
} finally {
if(captured)
lock.unlock();
}
}
public static void main(String[] args) {
final AttemptLocking al = new AttemptLocking();
al.untimed(); // True -- блокировка доступна
al.timed(); // True -- блокировка доступна
// Теперь создаем отдельную задачу для установления блокировки:
new Thread() {
{ setDaemon(true); }
public void run() {
al.lock.lock();
System.out.println("acquired");
}
}.start();
Thread.yield(); // Даем возможность 2-й задаче
al.untimed(); // False -- блокировка захвачена задачей
al.timed(); // False -- блокировка захвачена задачей
}
}
<spoiler text="Output:">
tryLock(): true
tryLock(2, TimeUnit.SECONDS): true
acquired
tryLock(): false
tryLock(2, TimeUnit.SECONDS): false
</spoiler>
Класс ReentrantLock делает возможной попытку получения блокировки с последующим отказом от нее. Таким образом, если кто-то уже захватил блокировку, вы можете отказаться от своих намерений (вместо того, чтобы дожидаться ее освобождения). В методе timed() делается попытка установления блокировки, которая может завершиться неудачей через 2 секунды (обратите внимание на использование класса Java SE5 TimeUnit для определения единиц времени). В main() отдельный объект Thread создается в виде безымянного класса и устанавливает блокировку, чтобы методы untimed() и timed() могли с чем-то конкурировать.
Атомарные операции и ключевое слово volatile
В дискуссиях, посвященных механизму потоков в Java, часто можно услышать такое утверждение: «Атомарные операции не требуют синхронизации». Атомарная операция — это операция, которую не может прервать планировщик потоков — если она начинается, то продолжается до завершения, без возможности переключения контекста (переключения выполнения на другой поток). Не полагайтесь на атомарность, она ненадежна и опасна — используйте ее вместо синхронизации только в том случае, если вы являетесь экспертом в области синхронизации или, по крайней мере, можете получить помощь от такого эксперта.
Атомарные операции, упоминаемые в таких дискуссиях, включают в себя «простые операции» с примитивными типами, за исключением long и double.
Чтение и запись примитивных переменных гарантированно выполняются как атомарные (неделимые) операции. С другой стороны, JVM разрешается выполнять чтение и запись 64-разрядных величин (long и double) в виде двух раздельных 32-разрядных операций, с ненулевой вероятностью переключения контекста в ходе чтения или записи. Для достижения атомарности (при простом присваивании и возврате значений) можно определить типы long и double с модификатором volatile (учтите, что до выхода Java SE5 ключевое слово volatile не всегда работало корректно). Некоторые реализации JVM могут предоставлять более сильные гарантии, но вы не должны полагаться на платформенно-специфические возможности.
В многопроцессорных системах (которые в наши дни представлены многоядерными процессорами, то есть несколькими процесорами на одном чипе) видимость (visibility) играет гораздо более важную роль, чем в однопроцессорных системах. Изменения, вносимые одной задачей, — даже атомарные в смысле невозможности прерывания — могут оставаться невидимыми для других задач (например, если изменения временно хранятся в локальном кэше процессора). Таким образом, разные задачи будут по-разному воспринимать состояние приложения. Механизм синхронизации обеспечивает распространение видимости изменений, вносимых одной задачей в многопроцессорной системе, по всему приложению. Без синхронизации невозможно заранее предсказать, когда именно изменения станут видимыми.
Ключевое слово volatile обеспечивает видимость в рамках приложения. Если поле объявлено как volatile, это означает, что сразу же после записи в поле изменение будет отражено во всех последующих операциях чтения. Утверждение истинно даже при участии локальных кэшей — поля volatile немедленно записываются в основную память, и дальнейшее чтение происходит из основной памяти.
Если слепо следовать концепции атомарности, можно заметить, что метод getValue() в следующем примере вроде бы отвечает этому описанию:
//: concurrency/AtomicityTest.java
import java.util.concurrent.*;
public class AtomicityTest implements Runnable {
private int i = 0;
public int getValue() { return i; }
private synchronized void evenIncrement() { i++; i++; }
public void run() {
while(true)
evenIncrement();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
AtomicityTest at = new AtomicityTest();
exec.execute(at);
while(true) {
int val = at.getValue();
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
<spoiler text="Output:"> (Sample)
191583767
</spoiler>
Однако программа находит нечетные значения и завершается. Хотя return і и является атомарной операцией, отсутствие synchronized позволит читать значение объекта, когда он находится в нестабильном промежуточном состоянии. Вдобавок переменная і не объявлена как volatile, а это приведет к проблемам с видимостью. Оба метода, getValue() и evenIncrement(), должны быть объявлены синхронизируемыми. Только эксперты в области параллельных вычислений могут пытаться применять оптимизацию в подобных случаях.
В качестве второго примера рассмотрим кое-что еще более простое: класс, производящий серийные номера. Каждый раз при вызове метода nextSerialNumber() он должен возвращать уникальное значение:
//: concurrency/SerialNumberGenerator.java
public class SerialNumberGenerator {
private static volatile int serialNumber = 0;
public static int nextSerialNumber() {
return serialNumber++; // Операция не является потоково-безопасной
}
}
Представить себе класс тривиальнее SerialNumberGenerator вряд ли можно, и если вы ранее работали с языком C++ или имеете другие низкоуровневые навыки, то, видимо, ожидаете, что операция инкремента будет атомарной, так как инкремент обычно реализуется в виде одной инструкции микропроцессора. Однако в виртуальной машине Java инкремент не является атомарным и состоит из чтения и записи, соответственно, ниша для проблем с потоками найдется даже в такой простой программе.
Поле serialNumber объявлено как volatile потому, что каждый поток обладает локальным стеком и поддерживает в нем копии некоторых локальных переменных. Если вы объявляете переменную как volatile, то тем самым указываете компилятору не проводить оптимизацию, а это приведет к удалению чтения и записи, удерживающих поле в соответствии с локальными данными потока. Операции чтения и записи осуществляются непосредственно с памятью без кэширования. Кроме того, volatile не позволяет компилятору изменять порядок обращений с целью оптимизации. И все же присутствие volatile не влияет на тот факт, что инкремент не является атомарной операцией.
Для тестирования нам понадобится множество, которое не потребует переизбытка памяти в том случае, если обнаружение проблемы отнимет много времени. Приведенный далее класс CircularSet многократно использует память, в которой хранятся целые числа (int); предполагается, что к тому моменту, когда запись в множество начинается по новому кругу, вероятность конфликта
с перезаписанными значениями минимальна. Методы add() и contains() объявлены как synchronized, чтобы избежать коллизий:
//: concurrency/SerialNumberChecker.java
// Кажущиеся безопасными операции с появлением потоков
// перестают быть таковыми...
// {Args: 4}
import java.util.concurrent.*;
// Reuses storage so we don't run out of memory:
class CircularSet {
private int[] array;
private int len;
private int index = 0;
public CircularSet(int size) {
array = new int[size];
len = size;
// Инициализируем значением, которое не производится
// классом SerialNumberGenerator:
for(int i = 0; i < size; i++)
array[i] = -1;
}
public synchronized void add(int i) {
array[index] = i;
// Возврат индекса к началу с записью поверх старых значений:
index = ++index % len;
}
public synchronized boolean contains(int val) {
for(int i = 0; i < len; i++)
if(array[i] == val) return true;
return false;
}
}
public class SerialNumberChecker {
private static final int SIZE = 10;
private static CircularSet serials =
new CircularSet(1000);
private static ExecutorService exec =
Executors.newCachedThreadPool();
static class SerialChecker implements Runnable {
public void run() {
while(true) {
int serial =
SerialNumberGenerator.nextSerialNumber();
if(serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < SIZE; i++)
exec.execute(new SerialChecker());
// Остановиться после n секунд при наличии аргумента:
if(args.length > 0) {
TimeUnit.SECONDS.sleep(new Integer(args[0]));
System.out.println("No duplicates detected");
System.exit(0);
}
}
}
<spoiler text="Output:"> (Sample)
Duplicate: 8468656
</spoiler>
В классе SerialNumberChecker содержится статическое поле CircuLarSet, хранящее все серийные номера, и вложенный поток Thread, который получает эти номера и удостоверяется в их уникальности. Создав несколько потоков, претендующих на серийные номера, вы обнаружите, что какой-нибудь из них довольно быстро получит уже имеющийся номер (заметьте, что на вашей машине программа может и не обнаружить конфликт, но на многопроцессорной системе она успешно их нашла). Для решения проблемы добавьте к методу nextSerialNumber() слово synchronized.
Предполагается, что безопасными атомарными операциями являются чтение и присвоение примитивов. Однако, как мы увидели в программе AtomicityTest.java, все так же просто использовать атомарную операцию для объекта, который находится в нестабильном промежуточном состоянии, так что ожидать, что какие-то предположения оправдаются, опасно и ненадежно.
Атомарные классы
В Java SE5 появились специальные классы для выполнения атомарных операций с переменными — Atomiclnteger, AtomicLong, AtomicReference и т. д. Эти классы содержат атомарную операцию условного обновления в форме
boolean compareAndSer(expectedValue, updateValue);
Эти классы предназначены для оптимизации с целью использования атомарности на машинном уровне на некоторых
современных процессорах, поэтому в общем случае вам они не понадобятся. Иногда они применяются и в повседневном программировании, но только при оптимизации производительности. Например, версия AtomicityTest.java, переписанная для использования AtomicInteger, выглядит так:
//: concurrency/AtomicIntegerTest.java
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class AtomicIntegerTest implements Runnable {
private AtomicInteger i = new AtomicInteger(0);
public int getValue() { return i.get(); }
private void evenIncrement() { i.addAndGet(2); }
public void run() {
while(true)
evenIncrement();
}
public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
public void run() {
System.err.println("Aborting");
System.exit(0);
}
}, 5000); // Завершение через 5 секунд
ExecutorService exec = Executors.newCachedThreadPool();
AtomicIntegerTest ait = new AtomicIntegerTest();
exec.execute(ait);
while(true) {
int val = ait.getValue();
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
Здесь вместо ключевого слова synchronized используется AtomicInteger. Так как сбой в программе не происходит, в программу включается таймер, автоматически завершающий ее через 5 секунд.
Вот как выглядит пример MutexEvenGenerator.java, переписанный для использования класса Atomiclnteger:
//: concurrency/AtomicEvenGenerator.java
// Атомарные классы иногда используются в обычном коде.
// {RunByHand}
import java.util.concurrent.atomic.*;
public class AtomicEvenGenerator extends IntGenerator {
private AtomicInteger currentEvenValue =
new AtomicInteger(0);
public int next() {
return currentEvenValue.addAndGet(2);
}
public static void main(String[] args) {
EvenChecker.test(new AtomicEvenGenerator());
}
}
Стоит еще раз подчеркнуть, что классы Atomic проектировались для построения классов из java.util.concurrent. Используйте их в своих программах только в особых случаях и только тогда, когда вы твердо уверены, что это не создаст новых проблем. В общем случае безопаснее использовать блокировки (с ключевым словом synchronized или явным созданием объектов Lock).
Критические секции
Иногда необходимо предотвратить доступ нескольких потоков только к части кода, а не к методу в целом. Фрагмент кода, который изолируется таким способом, называется критической секцией (critical section), для его создания также применяется ключевое слово synchronized. На этот раз слово synchronized определяет объект, блокировка которого должна использоваться для синхронизации последующего фрагмента кода:
synchronized(синхронизируемыйОбьект) {
// К такому коду доступ может получить
// одновременно только один поток
}
Такая конструкция иначе называется синхронизированной блокировкой (synchronized block); перед входом в нее необходимо получить блокировку для syncObject. Если блокировка уже предоставлена другому потоку, вход в последующий фрагмент кода запрещается до тех пор, пока блокировка не будет снята.
Следующий пример сравнивает два подхода к синхронизации, показывая, насколько увеличивается время, предоставляемое потокам для доступа к объекту при использовании синхронизированной блокировки вместо синхронизации методов. Вдобавок он демонстрирует, как незащищенный класс может «выжить» в многозадачной среде, если он управляется и защищается другим классом:
//: concurrency/CriticalSection.java
// Синхронизация блоков вместо целых методов. Также демонстрирует защиту
// неприспособленного к многопоточности класса другим классом
// with a thread-safe one.
package concurrency;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
class Pair { // Not thread-safe
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() { this(0, 0); }
public int getX() { return x; }
public int getY() { return y; }
public void incrementX() { x++; }
public void incrementY() { y++; }
public String toString() {
return "x: " + x + ", y: " + y;
}
public class PairValuesNotEqualException
extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal: " + Pair.this);
}
}
// Произвольный инвариант - обе переменные должны быть:
public void checkState() {
if(x != y)
throw new PairValuesNotEqualException();
}
}
// Защита класса Pair внутри приспособленного к потокам класса:
abstract class PairManager {
AtomicInteger checkCounter = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage =
Collections.synchronizedList(new ArrayList<Pair>());
public synchronized Pair getPair() {
// Создаем копию, чтобы сохранить оригинал в безопасности:
return new Pair(p.getX(), p.getY());
}
// Предполагается, что операция занимает некоторое время
protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch(InterruptedException ignore) {}
}
public abstract void increment();
}
// Синхронизация всего метода:
class PairManager1 extends PairManager {
public synchronized void increment() {
p.incrementX();
p.incrementY();
store(getPair());
}
}
// Использование критической секции:
class PairManager2 extends PairManager {
public void increment() {
Pair temp;
synchronized(this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}
class PairManipulator implements Runnable {
private PairManager pm;
public PairManipulator(PairManager pm) {
this.pm = pm;
}
public void run() {
while(true)
pm.increment();
}
public String toString() {
return "Pair: " + pm.getPair() +
" checkCounter = " + pm.checkCounter.get();
}
}
class PairChecker implements Runnable {
private PairManager pm;
public PairChecker(PairManager pm) {
this.pm = pm;
}
public void run() {
while(true) {
pm.checkCounter.incrementAndGet();
pm.getPair().checkState();
}
}
}
public class CriticalSection {
// Сравнение двух подходов:
static void
testApproaches(PairManager pman1, PairManager pman2) {
ExecutorService exec = Executors.newCachedThreadPool();
PairManipulator
pm1 = new PairManipulator(pman1),
pm2 = new PairManipulator(pman2);
PairChecker
pcheck1 = new PairChecker(pman1),
pcheck2 = new PairChecker(pman2);
exec.execute(pm1);
exec.execute(pm2);
exec.execute(pcheck1);
exec.execute(pcheck2);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch(InterruptedException e) {
System.out.println("Sleep interrupted");
}
System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
System.exit(0);
}
public static void main(String[] args) {
PairManager
pman1 = new PairManager1(),
pman2 = new PairManager2();
testApproaches(pman1, pman2);
}
}
<spoiler text="Output:"> (Sample)
pm1: Pair: x: 15, y: 15 checkCounter = 272565
pm2: Pair: x: 16, y: 16 checkCounter = 3956974
</spoiler>
Как было отмечено, класс Pair не приспособлен к работе с потоками, поскольку его инвариант (предположительно произвольный) требует равенства обоих переменных. Вдобавок, как мы уже видели в этой главе, операции инкремента небезопасны в отношении к потокам, и, так как ни один из методов не был объявлен как synchronized, мы не можем считать, что объект Pair останется неповрежденным в многопоточной программе.
Представьте, что вы получили готовый класс Pair, который должен работать в многопоточных условиях. Класс PairManager хранит объекты Pair и управляет любым доступом к ним. Заметьте, что единственными открытыми (public) методами являются getPair(), объявленный как synchronized, и абстрактный метод doTask(). Синхронизация этого метода будет осуществлена при его реализации.
Структура класса PairManager, в котором часть функциональности базового класса реализуется одним или несколькими абстрактными методами, определенными производными классами, называется на языке паттернов проектирования «шаблонным методом». Паттерны проектирования позволяют инкапсулировать изменения в коде — здесь изменяющаяся часть представлена методом increment(). В классе PairManager1 метод increment() полностью синхронизирован, в то время как в классе PairManager2 только часть его была синхронизирована посредством синхронизируемой блокировки. Обратите внимание еще раз, что ключевые слова synchronized не являются частью сигнатуры метода и могут быть добавлены во время переопределения.
Метод store() добавляет объект Pair в синхронизированный контейнер ArrayList, поэтому операция является потоково-безопасной. Следовательно, в защите он не нуждается, поэтому его вызов размещен за пределами синхронизируемого блока.
Класс PairManipulator создается для тестирования двух разновидностей PairManager: метод increment() вызывается в задаче в то время, как в другой задаче работает PairChecker. Метод main() создает два объекта PairManipulator и дает им поработать в течение некоторого времени, после чего выводятся результаты по каждому PairManipulator.
Для создания критических секций также можно воспользоваться явно созданными объектами Lock:
//: concurrency/ExplicitCriticalSection.java
// Использование объектов Lock для создания критических секций..
package concurrency;
import java.util.concurrent.locks.*;
// Синхронизация целого метода:
class ExplicitPairManager1 extends PairManager {
private Lock lock = new ReentrantLock();
public synchronized void increment() {
lock.lock();
try {
p.incrementX();
p.incrementY();
store(getPair());
} finally {
lock.unlock();
}
}
}
// Использование критической секции:
class ExplicitPairManager2 extends PairManager {
private Lock lock = new ReentrantLock();
public void increment() {
Pair temp;
lock.lock();
try {
p.incrementX();
p.incrementY();
temp = getPair();
} finally {
lock.unlock();
}
store(temp);
}
}
public class ExplicitCriticalSection {
public static void main(String[] args) throws Exception {
PairManager
pman1 = new ExplicitPairManager1(),
pman2 = new ExplicitPairManager2();
CriticalSection.testApproaches(pman1, pman2);
}
}
<spoiler text="Output:"> (Sample)
pm1: Pair: x: 15, y: 15 checkCounter = 174035
pm2: Pair: x: 16, y: 16 checkCounter = 2608588
</spoiler>
В программе создаются новые типы PairManager с явным использованием объектов Lock. ExplicitPairManager2 демонстрирует создание критической секции с использованием объекта Lock; вызов store() находится вне критической секции.
Синхронизация по другим объектам
Блоку synchronized необходимо передать объект, который будет использоваться для синхронизации. Чаще всего наиболее естественно передавать текущий объект, для которого был вызван метод synchronized(this), и именно такой подход применен в классе PairManager2. Таким образом, при входе в синхронизируемый блок другие синхронизированные методы объекта вызвать будет нельзя. Действие синхронизации по this фактически заключается в сужении области синхронизации.
Иногда вам нужно что-то иное, и в таких ситуациях вы создаете отдельный объект и выполняете синхронизацию, привлекая его. В таких случаях необходимо позаботиться о том, чтобы все операции синхронизировались по одному и тому же объекту. Следующий пример показывает, как два потока входят в объект, когда методы этого объекта синхронизированы различными блокировками:
//: concurrency/SyncObject.java
// Синхронизация по другому объекту.
import static net.mindview.util.Print.*;
class DualSynch {
private Object syncObject = new Object();
public synchronized void f() {
for(int i = 0; i < 5; i++) {
print("f()");
Thread.yield();
}
}
public void g() {
synchronized(syncObject) {
for(int i = 0; i < 5; i++) {
print("g()");
Thread.yield();
}
}
}
}
public class SyncObject {
public static void main(String[] args) {
final DualSynch ds = new DualSynch();
new Thread() {
public void run() {
ds.f();
}
}.start();
ds.g();
}
}
<spoiler text="Output:"> (Sample)
g()
f()
g()
f()
g()
f()
g()
f()
g()
f()
</spoiler>
Метод f() класса DualSync синхронизируется по объекту this (синхронизируя метод целиком), а метод g() использует синхронизацию посредством объекта syncObject. Таким образом, два варианта синхронизации независимы. Демонстрируется этот факт методом main(), в котором создается поток Threadс вызовом метода f(). Поток main() после этого вызывает метод g(). Из результата работы программы видно, что оба метода работают одновременно и ни один из них не блокируется соседом.
Локальная память потока
Второй механизм предотвращения конфликтов доступа к общим ресурсам основан на исключении их совместного использования. Локальная память потока представляет собой механизм автоматического выделения разных областей памяти для одной переменной во всех потоках, использующих объект. Следовательно, если пять потоков используют объект с переменной х, для х будет сгенерировано пять разных областей памяти. Фактически поток связывается с некоторым состоянием.
За выделение локальной памяти потоков и управление ею отвечает класс java.lang.ThreadLocal:
//: concurrency/ThreadLocalVariableHolder.java
// Автоматическое выделение собственной памяти каждому потоку.
import java.util.concurrent.*;
import java.util.*;
class Accessor implements Runnable {
private final int id;
public Accessor(int idn) { id = idn; }
public void run() {
while(!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString() {
return "#" + id + ": " +
ThreadLocalVariableHolder.get();
}
}
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value =
new ThreadLocal<Integer>() {
private Random rand = new Random(47);
protected synchronized Integer initialValue() {
return rand.nextInt(10000);
}
};
public static void increment() {
value.set(value.get() + 1);
}
public static int get() { return value.get(); }
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Accessor(i));
TimeUnit.SECONDS.sleep(3); // Небольшая задержка
exec.shutdownNow(); // Выход из всех объектов Accessor
}
}
<spoiler text="Output:"> (Sample)
#0: 9259
#1: 556
#2: 6694
#3: 1862
#4: 962
#0: 9260
#1: 557
#2: 6695
#3: 1863
#4: 963
...
</spoiler>
Объекты ThreadLocal обычно хранятся в статических полях. Если вы создаете объект ThreadLocal, для обращения к содержимому объекта можно использовать только методы get() и set(). Метод get() возвращает копию объекта, ассоциированного с потоком, a set() сохраняет свой аргумент в объекте потока, возвращая ранее хранившийся объект. Их использование продемонстрировано в методах increment() и get() класса ThreadLocalVariableHolder. Обратите внимание: методы increment() и get() не синхронизированы, потому что ThreadLocal не гарантирует отсутствия «ситуации гонки».
Взаимодействие между потоками
Итак, мы выяснили, что потоки способны конфликтовать друг с другом, и разобрались с тем, как предотвратить такие конфликты. Следующим шагом должно стать изучение возможностей взаимодействия между потоками. Ключевым моментом в этом процессе является подтверждение связи, безопасно реализуемое методами wait() и notify() класса Object. В многопоточной библиотеке Java SE5 также присутствуют объекты Condition с методами await() и signal(). Мы рассмотрим некоторые возникающие проблемы и их решения.
Методы wait() и notifyAII()
Метод wait() ожидает изменения некоторого условия, неподконтрольного для текущего метода. Довольно часто это условие изменяется в результате выполнения другой задачи. Активное ожидание, то есть проверка условия в цикле, нежелательно из-за неэффективного расходования вычислительных ресурсов. Таким образом, метод wait() обеспечивает механизм синхронизации действий между задачами.
Важно понять, что метод sleep() не освобождает объект блокировки. С другой стороны, метод wait() снимает блокировку с объекта, тем самым позволяя остальным потокам вызывать другие синхронизированные методы объекта во время выполнения wait(). Это очень важно, потому что обычно именно «другие» методы приводят к изменению условия и активизации приостановленной задачи.
Существует две формы метода wait(). У первой формы аргумент имеет такой же смысл, как и аргумент метода sleep(): это продолжительность интервала в миллисекундах, на который приостанавливается выполнение потока. Разница между методами состоит в следующем:
- При выполнении метода wait() блокируемый объект освобождается.
- Выйти из состояния ожидания, установленного wait(), можно двумя способами: с помощью уведомления notify() или notifyAll() либо по истечении срока ожидания.
Вторая, более распространенная форма вызывается без аргументов. Эта версия метода wait() заставит поток простаивать, пока не придет уведомление notify() или notifyAll().
Пожалуй, самое интересное в методах wait(), notify() и notifyAll() — их принадлежность к общему классу Object, а не к классу потоков Thread. Хотя это и кажется немного нелогичным — размещение чего-то, относящегося исключительно к механизму потоков, в общем базовом классе — на самом деле это решение совершенно оправдано, поскольку означенные методы манипулируют блокировками, которые являются частью любого объекта. В результате ожидание (wait()) может использоваться в любом синхронизированном методе, независимо от того, наследует ли класс от Thread или реализует Runnable. Вообще говоря, единственное место, где допустимо вызывать метод wait(), — это синхронизированный метод или блок (метод sleep() можно вызывать в любом месте, так как он не манипулирует блокировкой). Если вызвать метод wait() или notify() в обычном методе, программа скомпилируется, однако при ее выполнении возникнет исключение IllegalMonitorStateException с несколько туманным сообщением «текущий поток не является владельцем» («current thread not owner»). Это сообщение означает, что поток, востребовавший методы wait(), notify() или notifyAll(), должен быть «хозяином» блокируемого объекта (овладеть объектом блокировки) перед вызовом любого из данных методов.
Вы можете «попросить» объект провести операции с помощью его собственного объекта блокировки. Для этого необходимо сначала захватить блокировку для данного объекта. Например, если вы хотите вызвать notifyAll() для объекта х, то должны сделать это в синхронизируемом блоке, устанавливающем блокировку для х:
synchronized(x) { х.notifyAll();}
Рассмотрим простой пример. В программе WaxOMatic.java задействованы два процесса: один наносит восковую пасту на автомашину (Саr), а другой полирует ее. Задача полировки не может приступить к работе до того, как задача нанесения пасты завершит свою операцию, а задача нанесения пасты должна ждать завершения полировки, чтобы наложить следующий слой пасты. Оба класса, WaxOn и WaxOff, используют объект Саr, который приостанавливает и возобновляет задачи в ожидании изменения условия:
//: concurrency/waxomatic/WaxOMatic.java
// Простейшее взаимодействие задач.
package concurrency.waxomatic;
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
class Car {
private boolean waxOn = false;
public synchronized void waxed() {
waxOn = true; // Готово к обработке
notifyAll();
}
public synchronized void buffed() {
waxOn = false; // Готово к нанесению очередного слоя
notifyAll();
}
public synchronized void waitForWaxing()
throws InterruptedException {
while(waxOn == false)
wait();
}
public synchronized void waitForBuffing()
throws InterruptedException {
while(waxOn == true)
wait();
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
}
public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5); // Небольшая задержка...
exec.shutdownNow(); // Прерывание всех задач
}
}
<spoiler text="Output:"> (95% match)
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On!
Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off!
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax On task
Exiting via interrupt
Ending Wax Off task
</spoiler>
Класс Саr содержит одну логическую переменную waxOn, которая описывает текущее состояние процесса полировки.
Метод waitForWaxing() проверяет флаг waxOn, и, если он равен false, вызывающая задача приостанавливается вызовом wait(). Очень важно, что это происходит в синхронизированном методе. При вызове wait() поток приостанавливается, а блокировка снимается. Последнее принципиально, потому что для безопасного изменения состояния объекта (например, для присваивания waxOn значения true, без чего приостановленная задача не сможет продолжить работу) блокировка должна быть доступна для другой задачи. В нашем примере при вызове другой задачей метода waxed(), указывающего, что пришло время что-то сделать, для задания истинного значения waxOn необходимо установить блокировку. Затем waxed() вызывает notifyAll(); задача, приостановленная вызовом wait(), активизируется. Для этого нужно сначала заново получить блокировку, освобожденную при входе в wait(). Задача не активизируется, пока блокировка не станет доступной.
Использование каналов для ввода/вывода между потоками
Часто бывает полезно организовать взаимодействие потоков посредством механизмов ввода/вывода. Библиотеки потоков могут предоставлять поддержку ввода/вывода между потоками в форме каналов (pipes). Последние представлены в стандартной библиотеке ввода/вывода Java классами PipedWriter (позволяет потоку записывать в канал) и PipedReader (предоставляет возможность другому потоку считывать из того же канала).
Простой пример взаимодействия двух потоков через канал:
//: concurrency/PipedIO.java
// Использование каналов для ввода/вывода между потоками
import java.util.concurrent.*;
import java.io.*;
import java.util.*;
import static net.mindview.util.Print.*;
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
public void run() {
try {
while(true)
for(char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
} catch(IOException e) {
print(e + " Sender write exception");
} catch(InterruptedException e) {
print(e + " Sender sleep interrupted");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while(true) {
// Блокируется до появления следующего символа:
printnb("Read: " + (char)in.read() + ", ");
}
} catch(IOException e) {
print(e + " Receiver read exception");
}
}
}
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();
}
}
<spoiler text="Output:"> (65% match)
Read: A, Read: B, Read: C, Read: D, Read: E, Read: F, Read: G, Read: H, Read: I,
Read: J, Read: K, Read: L, Read: M,
java.lang.InterruptedException: sleep interrupted Sender sleep interrupted
java.io.InterruptedIOException Receiver read exception
</spoiler>
Классы Sender и Receiver представляют задачи, которые должны взаимодействовать друг с другом. В классе Sender создается канал PipedWriter, существующий как автономный объект, однако при создании канала PipedReader в классе Receiver конструктору необходимо передать ссылку на PipedWriter. Sender записывает данные в канал Writer и бездействует в течение случайно выбранного промежутка времени. Класс Receiver не содержит вызовов sleep() или wait(), но при проведении чтения методом read() он автоматически блокируется при отсутствии данных.
Заметьте, что потоки sender и receiver запускаются из main() после того, как объекты были полностью сконструированы. Если запускать не полностью сконструированные объекты, каналы на различных платформах могут демонстрировать несогласованное поведение.
Взаимная блокировка
Итак, потоки способны перейти в блокированное состояние, а объекты могут обладать синхронизированными методами, которые запрещают использование объекта до тех пор, пока не будет снята блокировка. Возможна ситуация, в которой один поток ожидает другой поток, тот, в свою очередь, ждет освобождения еще одного потока и т. д., пока эта цепочка не замыкается на поток, который ожидает освобождения первого потока. Получается замкнутый круг потоков, которые дожидаются освобождения друг друга, и никто не может двинуться первым. Такая ситуация называется взаимной блокировкой (deadlock) (или «клинчем». — Примеч. ред.).
Если вы запускаете программу и в ней незамедлительно возникает взаимная блокировка, проблему удается немедленно отследить. По-настоящему неприятна ситуация, когда ваша программа по всем признакам работает прекрасно, но тем не менее в какой-то момент входит во взаимную блокировку. Такая опасность незаметно присутствует в программе, пока нежданно-негаданно не проявится у заказчика (и, скорее всего, легко воспроизвести эту ситуацию вам не удастся). Таким образом, тщательное проектирование программы с целью предотвращения взаимных блокировок — важнейшая часть разработки параллельных программ.
Классический пример взаимной блокировки, предложенный Эдгаром Дейкстрой — задача об обедающих философах. В стандартной формулировке говорится о пяти философах, но, как будет показано далее, допустимо любое количество. Часть времени философы проводят размышляя, часть времени проводят за едой. Когда они размышляют, то не нуждаются в общих ресурсах, но во время обеда они сидят за круглым столом с ограниченным количеством столовых приборов. В описании оригинальной задачи философы пользуются вилками, и, чтобы набрать спагетти из миски в центре стола, им требуются две вилки. Наверное, задача будет выглядеть более логично, если заменить вилки палочками для еды — очевидно, что каждому философу понадобятся две палочки.
Философы, как это часто бывает, очень бедны, и они смогли позволить себе приобрести лишь пять палочек (или в более общем виде — количество палочек совпадает с количеством философов). Последние разложены кругом по столу, между философами. Когда философу захочется поесть, ему придется взять палочку слева и справа. Если с какой-либо стороны желаемая палочка уже в руке другого философа, только что оторвавшемуся от размышлений придется подождать ее освобождения:
//: concurrency/Chopstick.java
// Палочки для обедающих философов.
public class Chopstick {
private boolean taken = false;
public synchronized
void take() throws InterruptedException {
while(taken)
wait();
taken = true;
}
public synchronized void drop() {
taken = false;
notifyAll();
}
}
Два философа (Philosopher) ни при каких условиях не смогут успешно взять (take()) одну и ту же палочку (Chopstick) одновременно. Если один философ уже взял палочку, другому философу придется подождать (wait()), пока она не будет освобождена текущим пользователем (drop()).
Когда задача Philosopher вызывает take(), она ожидает, пока флаг taken не перейдет в состояние false (то есть пока палочка не будет освобождена тем философом, который держит ее в данный момент). Далее задача устанавливает флаг taken равным true, показывая тем самым, что палочка занята. Завершив работу с Chopstick, Philosopher вызывает drop(), чтобы изменить флаг и оповестить (notifyAll()) всех остальных философов, ожидающих освобождения палочки:
//: concurrency/Philosopher.java
// Обедающий философ
import java.util.concurrent.*;
import java.util.*;
import static net.mindview.util.Print.*;
public class Philosopher implements Runnable {
private Chopstick left;
private Chopstick right;
private final int id;
private final int ponderFactor;
private Random rand = new Random(47);
private void pause() throws InterruptedException {
if(ponderFactor == 0) return;
TimeUnit.MILLISECONDS.sleep(
rand.nextInt(ponderFactor * 250));
}
public Philosopher(Chopstick left, Chopstick right,
int ident, int ponder) {
this.left = left;
this.right = right;
id = ident;
ponderFactor = ponder;
}
public void run() {
try {
while(!Thread.interrupted()) {
print(this + " " + "thinking");
pause();
// Философ проголодался
print(this + " " + "grabbing right");
right.take();
print(this + " " + "grabbing left");
left.take();
print(this + " " + "eating");
pause();
right.drop();
left.drop();
}
} catch(InterruptedException e) {
print(this + " " + "exiting via interrupt");
}
}
public String toString() { return "Philosopher " + id; }
}
В методе Philosopher.run() все философы непрерывно переходят от размышлений к еде, и наоборот. Метод pause() делает паузу случайной продолжительности, если значение ponderFactor отлично от нуля. Итак, Philosopher думает в течение случайного промежутка времени, затем пытается захватить левую и правую палочки вызовами take(), ест в течение случайного промежутка времени, а затем все повторяется.
В следующей версии программы возникает взаимная блокировка:
//: concurrency/DeadlockingDiningPhilosophers.java
// Демонстрация скрытой возможности взаимной блокировки.
// {Args: 0 5 timeout}
import java.util.concurrent.*;
public class DeadlockingDiningPhilosophers {
public static void main(String[] args) throws Exception {
int ponder = 5;
if(args.length > 0)
ponder = Integer.parseInt(args[0]);
int size = 5;
if(args.length > 1)
size = Integer.parseInt(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for(int i = 0; i < size; i++)
sticks[i] = new Chopstick();
for(int i = 0; i < size; i++)
exec.execute(new Philosopher(
sticks[i], sticks[(i+1) % size], i, ponder));
if(args.length == 3 && args[2].equals("timeout"))
TimeUnit.SECONDS.sleep(5);
else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();
}
} /* (Execute to see output) *///:~
Если философы почти не тратят время на размышления, они будут постоянно конкурировать за палочки при попытках поесть, и взаимные блокировки возникают гораздо чаще.
Первый аргумент командной строки изменяет значение ponder, влияющее на продолжительность размышлений. Если философов очень много или они проводят большую часть времени в размышлениях, взаимная блокировка может и не возникнуть, хотя ее теоретическая вероятность отлична от нуля. С нулевым аргументом взаимная блокировка наступает намного быстрее.
Объектам Chopstick не нужны внутренние идентификаторы; они идентифицируются по своей позиции в массиве sticks. Каждому конструктору Philosopher передаются ссылки на правую и левую палочки Chopstick. Последнему Philosopher в качестве правой палочки передается нулевой объект Chopstick; круг замыкается. Теперь может возникнуть ситуация, когда все философы одновременно попытаются есть, и каждый из них будет ожидать, пока сосед положит свою палочку. В программе наступает взаимная блокировка.
Если философы тратят на размышления больше времени, чем на еду, вероятность взаимной блокировки значительно снижается. Даже может возникнуть иллюзия, что программа свободна от блокировок (при ненулевом значении ponder или большом количестве объектов Philosopher), хотя на самом деле это не так. Именно этим и интересен настоящий пример: программа вроде бы ведет себя верно, тогда как на самом деле возможна взаимная блокировка.
Для решения проблемы необходимо осознавать, что тупик имеет место при стечении следующих четырех обстоятельств:
- Взаимное исключение: по крайней мере один ресурс, используемый потоками, не должен быть совместно используемым. В нашем случае одной палочкой для еды не могут одновременно есть два философа.
- По крайней мере одна задача должна удерживать ресурс и ожидать выделения ресурса, в настоящее время удерживаемого другой задачей. То есть для возникновения тупика философ должен сохранять при себе одну палочку и ожидать другую.
- Ресурс нельзя принудительно отбирать у задачи. Все процессы должны освобождать ресурсы естественным путем. Наши философы вежливы и не станут выхватывать палочки друг у друга.
- Должно произойти круговое ожидание, когда процесс ожидает ресурс, занятый другим процессом, который в свою очередь ждет ресурс, удерживаемый еще одним процессом, и т. д., пока один из процессов не будет ожидать ресурса, занятого первым процессом, что и приведет к порочному кругу. В нашем примере круговое ожидание происходит потому, что каждый философ пытается сначала получить правую палочку, а потом левую.
Так как взаимная блокировка возникает лишь при соблюдении всех перечисленных условий, для упреждения тупика достаточно нарушить всего лишь одно из них. В нашей программе проще всего нарушить четвертое условие: оно выполняется, поскольку каждый философ старается брать палочки в определенном порядке — сначала левую, потом правую. Из-за этого может возникнуть ситуация, когда каждый из них держит свою левую палочку и ждет освобождения правой, что и приводит к циклическому ожиданию. Если инициализировать последнего философа так, чтобы он сначала пытался взять левую палочку, а потом правую, взаимная блокировка станет невозможна. Это всего лишь одно решение проблемы, но вы можете предотвратить ее, нарушив одно из оставшихся условий (за подробностями обращайтесь к специализированной литературе по многозадачному программированию):
//: concurrency/FixedDiningPhilosophers.java
// Обедающие философы без взаимной блокировки.
// {Args: 5 5 timeout}
import java.util.concurrent.*;
public class FixedDiningPhilosophers {
public static void main(String[] args) throws Exception {
int ponder = 5;
if(args.length > 0)
ponder = Integer.parseInt(args[0]);
int size = 5;
if(args.length > 1)
size = Integer.parseInt(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for(int i = 0; i < size; i++)
sticks[i] = new Chopstick();
for(int i = 0; i < size; i++)
if(i < (size-1))
exec.execute(new Philosopher(
sticks[i], sticks[i+1], i, ponder));
else
exec.execute(new Philosopher(
sticks[0], sticks[i], i, ponder));
if(args.length == 3 && args[2].equals("timeout"))
TimeUnit.SECONDS.sleep(5);
else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();
}
} /* (Execute to see output) *///:~
Проследив за тем, чтобы последний философ брал и откладывал левую палочку раньше правой, мы устраняем взаимную блокировку.
В языке Java нет встроенных средств предупреждения взаимных блокировок; все зависит только от вас и аккуратности вашего кода. Вряд ли эти слова утешат того, кому придется отлаживать программу с взаимной блокировкой.
Новые библиотечные компоненты
В библиотеке java.util.concurrent из Java SE5 появился целый ряд новых классов, предназначенных для решения проблем многозадачности. Научившись пользоваться ими, вы сможете создавать более простые и надежные многозадачные программы.
В этом разделе приведено немало примеров использования различных компонентов. Другие, относительно редко встречающиеся компоненты, здесь не рассматриваются.
Так как компоненты предназначены для решения разных проблем, простого способа их упорядочения не существует, поэтому мы начнем с более простых примеров и постепенно перейдем к более сложным.
CountDownLatch
Класс синхронизирует задачи, заставляя их ожидать завершения группы операций, выполняемых другими задачами.
Объекту CountDownLatch присваивается начальное значение счетчика, а все задачи, вызвавшие await() для этого объекта, блокируются до момента обнуления счетчика. Другие задачи могут уменьшать счетчик, вызывая метод countDown() для объекта (обычно это делается тогда, когда задача завершает свою работу). Класс CountDownLatch рассчитан на «одноразовое» применение; счетчик не может возвращаться к прежнему состоянию. Если вам нужна версия с возможностью сброса счетчика, воспользуйтесь классом CyclicBarrier.
Задачи, вызывающие countDown(), не блокируются на время вызова. Только вызов await() блокируется до момента обнуления счетчика.
Типичный способ применения — разделение задачи на n независимых подзадач и создание объекта CountDownLatch с начальным значением n. При завершении каждая подзадача вызывает countDown() для объекта синхронизации. Потоки, ожидающие решения общей задачи, блокируются вызовом await(). Описанная методика продемонстрирована в следующем примере:
//: concurrency/CountDownLatchDemo.java
import java.util.concurrent.*;
import java.util.*;
import static net.mindview.util.Print.*;
// Часть основной задачи.:
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
doWork();
latch.countDown();
} catch(InterruptedException ex) {
// Приемлемый вариант выхода
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
print(this + "completed");
}
public String toString() {
return String.format("%1$-3d ", id);
}
}
// Ожидание по объекту CountDownLatch:
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
print("Latch barrier passed for " + this);
} catch(InterruptedException ex) {
print(this + " interrupted");
}
}
public String toString() {
return String.format("WaitingTask %1$-3d ", id);
}
}
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// Все подзадачи совместно используют один объект CountDownLatch:
CountDownLatch latch = new CountDownLatch(SIZE);
for(int i = 0; i < 10; i++)
exec.execute(new WaitingTask(latch));
for(int i = 0; i < SIZE; i++)
exec.execute(new TaskPortion(latch));
print("Launched all tasks");
exec.shutdown(); // Выход по завершению всех задач
}
} /* (Execute to see output) *///:~
TaskPortion некоторое время ожидает, имитируя выполнение части задачи, а класс WaitingTask представляет некую часть системы, которая обязана дождаться завершения всех подзадач. Все задачи используют один и тот же объект CountDownLatch, определяемый в main().
CyclicBarrier
Класс CyclicBarrier используется при создании группы параллельно выполняемых задач, завершения которых необходимо дождаться до перехода к следующей фазе. Все параллельные задачи «приостанавливаются» у барьера, чтобы сделать возможным их согласованное продвижение вперед. Класс очень похож на CountDownLatch, за одним важным исключением: CountDownLatch является «одноразовым», a CyclicBarrier может использоваться снова и снова.
Имитации привлекали меня с первых дней работы с компьютерами, и параллельные вычисления играют в них ключевую роль. Даже самая первая программа, которую я написал на BASIC, имитировала скачки на ипподроме. Вот как выглядит объектно-ориентированная, многопоточная версия этой программы с использованием CyclicBarrier:
//: concurrency/HorseRace.java
// Using CyclicBarriers.
import java.util.concurrent.*;
import java.util.*;
import static net.mindview.util.Print.*;
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) { barrier = b; }
public synchronized int getStrides() { return strides; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
strides += rand.nextInt(3); // Produces 0, 1 or 2
}
barrier.await();
}
} catch(InterruptedException e) {
// Приемлемый вариант выхода
} catch(BrokenBarrierException e) {
// Исключение, которое нас интересует
throw new RuntimeException(e);
}
}
public String toString() { return "Horse " + id + " "; }
public String tracks() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < getStrides(); i++)
s.append("*");
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec =
Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < FINISH_LINE; i++)
s.append("="); // Забор на беговой дорожке
print(s);
for(Horse horse : horses)
print(horse.tracks());
for(Horse horse : horses)
if(horse.getStrides() >= FINISH_LINE) {
print(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch(InterruptedException e) {
print("barrier-action sleep interrupted");
}
}
});
for(int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
if(args.length > 0) { // Необязательный аргумент
int n = new Integer(args[0]);
nHorses = n > 0 ? n : nHorses;
}
if(args.length > 1) { // Необязательный аргумент
int p = new Integer(args[1]);
pause = p > -1 ? p : pause;
}
new HorseRace(nHorses, pause);
}
} /* (Execute to see output) *///:~
Для объекта CyclicBarrier можно задать «барьерное действие» — объект Runnable, автоматически запускаемый при обнулении счетчика (еще одно отличие CyclicBarrier от CountdownLatch). В нашем примере барьерное действие определяется в виде безымянного класса, передаваемого конструктору CyclicBarrier.
Я попытался сделать так, чтобы каждый объект лошади отображал себя, но порядок отображения зависел от диспетчера задач. Благодаря CyclicBarrier каждая лошадь делает то, что ей необходимо для продвижения вперед, а затем ожидает у барьера перемещения всех остальных лошадей. Когда все лошади переместятся, CyclicBarrier автоматически вызывает «барьерную» задачу Runnable, чтобы отобразить всех лошадей по порядку вместе с барьером. Как только все задачи пройдут барьер, последний автоматически становится готовым для следующего захода.
DelayQueue
Класс представляет неограниченную блокирующую очередь объектов, реализующих интерфейс Delayed. Объект может быть извлечен из очереди только после истечения задержки. Очередь сортируется таким образом, что объект в начале очереди обладает наибольшим сроком истечения задержки. Если задержка ни у одного объекта не истекла, начального элемента нет, и вызов poll() возвращает null (из-за этого в очередь не могут помещаться элементы null).
В следующем примере объекты, реализующие Delayed, сами являются задачами, a DelayedTaskContainer берет задачу с наибольшей просроченной задержкой и запускает ее. Таким образом, DelayQueue является разновидностью приоритетной очереди.
//: concurrency/DelayQueueDemo.java
import java.util.concurrent.*;
import java.util.*;
import static java.util.concurrent.TimeUnit.*;
import static net.mindview.util.Print.*;
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence =
new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() +
NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit) {
return unit.convert(
trigger - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask)arg;
if(trigger < that.trigger) return -1;
if(trigger > that.trigger) return 1;
return 0;
}
public void run() { printnb(this + " "); }
public String toString() {
return String.format("[%1$-4d]", delta) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
public void run() {
for(DelayedTask pt : sequence) {
printnb(pt.summary() + " ");
}
print();
print(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while(!Thread.interrupted())
q.take().run(); // Выполнение задачи в текущем потоке
} catch(InterruptedException e) {
// Приемлемый вариант выхода
}
print("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue =
new DelayQueue<DelayedTask>();
// Очередь заполняется задачами со случайной задержкой:
for(int i = 0; i < 20; i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
// Назначение точки остановки
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
<spoiler text="Output:">
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ]
Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861]
Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258]
Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258)
(1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207)
(10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861)
(18:520) (19:4258) (20:5000)
[5000] Task 20 Calling shutdownNow()
Finished DelayedTaskConsumer
</spoiler>
DelayedTask содержит контейнер List<DelayedTask> с именем sequence, в котором сохраняется порядок создания задач, и мы видим, что сортировка действительно выполняется.
Интерфейс Delayed содержит единственный метод getDelay(), который сообщает, сколько времени осталось до истечения задержки или как давно задержка истекла. Метод заставляет нас использовать класс TimeUnit, потому что его аргумент относится именно к этому типу. Впрочем, этот класс очень удобен, поскольку он позволяет легко преобразовывать единицы без каких-либо вычислений. Например, значение delta хранится в миллисекундах, а метод Java SE5 System.nanoTime() выдает значение в наносекундах. Чтобы преобразовать значение delta, достаточно указать исходные и итоговые единицы:
NANOSECONDS.convert(delta. MILL ISECONDS);
В getDelay() желаемые единицы передаются в аргументе unit. Аргумент используется для преобразования времени задержки во временные единицы, используемые вызывающей стороной.
Для выполнения сортировки интерфейс Delayed также наследует интерфейс Comparable, поэтому необходимо реализовать метод compareTo() для выполнения осмысленных сравнений. Методы toString() и summary() обеспечивают форматирование вывода.
Из выходных данных видно, что порядок создания задач не влияет на порядок их выполнения — вместо этого задачи, как и предполагалось, выполняются в порядке следования задержек.
PriorityBlockingQueue
Фактически класс PriorityBlockingQueue представляет приоритетную очередь с блокирующими операциями выборки. В следующем примере объектами в приоритетной очереди являются задачи, покидающие очередь в порядке приоритетов. Для определения этого порядка в класс PrioritizedTask включается поле priority:
//: concurrency/PriorityBlockingQueueDemo.java
import java.util.concurrent.*;
import java.util.*;
import static net.mindview.util.Print.*;
class PrioritizedTask implements
Runnable, Comparable<PrioritizedTask> {
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence =
new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 :
(priority > arg.priority ? -1 : 0);
}
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch(InterruptedException e) {
// Приемлемый вариант выхода
}
print(this);
}
public String toString() {
return String.format("[%1$-3d]", priority) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // Минимальный приоритет в этой программе
exec = e;
}
public void run() {
int count = 0;
for(PrioritizedTask pt : sequence) {
printnb(pt.summary());
if(++count % 5 == 0)
print();
}
print();
print(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(
Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e; // Используется для EndSentinel
}
public void run() {
// Неограниченная очередь без блокировки.
// Быстрое заполнение случайными приоритетами:
for(int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
// Добавление высокоприоритетных задач:
try {
for(int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
// Добавление заданий, начиная с наименьших приоритетов:
for(int i = 0; i < 10; i++)
queue.add(new PrioritizedTask(i));
// Предохранитель для остановки всех задач::
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch(InterruptedException e) {
// Приемлемый вариант выхода
}
print("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(
PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
public void run() {
try {
while(!Thread.interrupted())
// Использование текущего потока для запуска задачи:
q.take().run();
} catch(InterruptedException e) {
// Приемлемый вариант выхода
}
print("Finished PrioritizedTaskConsumer");
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue =
new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
} /* (Execute to see output) *///:~
Как и в предыдущем примере, последовательность создания объектов PrioritizedTask сохраняется в контейнере List sequence для сравнения с фактическим порядком выполнения. Метод run() делает небольшую паузу, а затем выводит информацию об объекте, а предохранитель EndSentinel выполняет те же функции, что и прежде.
PrioritizedTaskProducer и PrioritizedTaskConsumer связываются друг с другом через PriorityBlockingQueue. Так как сам блокирующий характер очереди обеспечивает всю необходимую синхронизацию, явная синхронизация не нужна — при чтении вам не нужно думать о том, содержит ли очередь элементы, потому что при отсутствии элементов очередь просто заблокирует читающую сторону.
Управление оранжереей на базе ScheduledExecutor
В главе 10 была представлена система управления гипотетической оранжереей, которая включала (отключала) различные устройства и регулировала их работу. Происходящее можно преобразовать в контекст многозадачности: каждое событие оранжереи представляет собой задачу, запускаемую в заранее заданное время. Класс ScheduledThreadPoolExecutor предоставляет именно тот сервис, который необходим для решения задачи. Используя методы schedule() (однократный запуск задачи) или scheduleAtFixedRate() (повторение задачи с постоянным промежутком), мы создаем объекты Runnable, которые должны запуститься в положенное время. Сравните это решение с тем, что приведено в главе 10, и посмотрите, насколько оно упрощается благодаря готовой функциональности ScheduledThreadPoolExecutor:
//: concurrency/GreenhouseScheduler.java
// Новая реализация іnnerclasses/GreenhouseController.java
// с использованием ScheduledThreadPoolExecutor.
// {Args: 5000}
import java.util.concurrent.*;
import java.util.*;
public class GreenhouseScheduler {
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized String getThermostat() {
return thermostat;
}
public synchronized void setThermostat(String value) {
thermostat = value;
}
ScheduledThreadPoolExecutor scheduler =
new ScheduledThreadPoolExecutor(10);
public void schedule(Runnable event, long delay) {
scheduler.schedule(event,delay,TimeUnit.MILLISECONDS);
}
public void
repeat(Runnable event, long initialDelay, long period) {
scheduler.scheduleAtFixedRate(
event, initialDelay, period, TimeUnit.MILLISECONDS);
}
class LightOn implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn on the light.
System.out.println("Turning on lights");
light = true;
}
}
class LightOff implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn off the light.
System.out.println("Turning off lights");
light = false;
}
}
class WaterOn implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water on");
water = true;
}
}
class WaterOff implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water off");
water = false;
}
}
class ThermostatNight implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to night setting");
setThermostat("Night");
}
}
class ThermostatDay implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to day setting");
setThermostat("Day");
}
}
class Bell implements Runnable {
public void run() { System.out.println("Bing!"); }
}
class Terminate implements Runnable {
public void run() {
System.out.println("Terminating");
scheduler.shutdownNow();
// Must start a separate task to do this job,
// since the scheduler has been shut down:
new Thread() {
public void run() {
for(DataPoint d : data)
System.out.println(d);
}
}.start();
}
}
// New feature: data collection
static class DataPoint {
final Calendar time;
final float temperature;
final float humidity;
public DataPoint(Calendar d, float temp, float hum) {
time = d;
temperature = temp;
humidity = hum;
}
public String toString() {
return time.getTime() +
String.format(
" temperature: %1$.1f humidity: %2$.2f",
temperature, humidity);
}
}
private Calendar lastTime = Calendar.getInstance();
{ // Adjust date to the half hour
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}
private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(
new ArrayList<DataPoint>());
class CollectData implements Runnable {
public void run() {
System.out.println("Collecting data");
synchronized(GreenhouseScheduler.this) {
// Pretend the interval is longer than it is:
lastTime.set(Calendar.MINUTE,
lastTime.get(Calendar.MINUTE) + 30);
// One in 5 chances of reversing the direction:
if(rand.nextInt(5) == 4)
tempDirection = -tempDirection;
// Store previous value:
lastTemp = lastTemp +
tempDirection * (1.0f + rand.nextFloat());
if(rand.nextInt(5) == 4)
humidityDirection = -humidityDirection;
lastHumidity = lastHumidity +
humidityDirection * rand.nextFloat();
// Calendar must be cloned, otherwise all
// DataPoints hold references to the same lastTime.
// For a basic object like Calendar, clone() is OK.
data.add(new DataPoint((Calendar)lastTime.clone(),
lastTemp, lastHumidity));
}
}
}
public static void main(String[] args) {
GreenhouseScheduler gh = new GreenhouseScheduler();
gh.schedule(gh.new Terminate(), 5000);
// Former "Restart" class not necessary:
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.new ThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
}
} /* (Execute to see output) *///:~
В этой версии, помимо реорганизации кода, добавляется новая возможность: сбор данных о температуре и влажности в оранжерее. Объект DataPoint содержит и выводит одну точку данных, а запланированная задача CollectData генерирует данные имитации и включает их в List<DataPoint> при каждом запуске.
Обратите внимание на ключевые слова volatile и synchronized; благодаря им задачи не мешают работе друг друга. Все методы контейнера List с элементами DataPoint синхронизируются с использованием метода synchronizedList() библиотеки java.util.Соllectiоns при создании List.
Семафоры
При обычной блокировке доступ к ресурсу в любой момент времени разрешается только одной задаче. Семафор со счетчиком позволяет n задачам одновременно обращаться к ресурсу. Можно считать, что семафор «выдает разрешения» на использование ресурса, хотя никаких реальных объектов разрешений в этой схеме нет.
В качестве примера рассмотрим концепцию пула объектов: объекты, входящие в пул, «выдаются» для использования, а затем снова возвращаются в пул после того, как пользователь закончит работу с ними. Эта функциональность инкапсулируется в параметризованном классе:
//: concurrency/Pool.java
// Использование Semaphore в Pool ограничивает количество
// задач, которые могут использовать ресурс.
import java.util.concurrent.*;
import java.util.*;
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(size, true);
// Заполнение пула объектами :
for(int i = 0; i < size; ++i)
try {
// Предполагается наличие конструктора по умолчанию:
items.add(classObject.newInstance());
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x) {
if(releaseItem(x))
available.release();
}
private synchronized T getItem() {
for(int i = 0; i < size; ++i)
if(!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
return null; // Семафор предотвращает переход в зту точку
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) return false; // Отсутствует в списке
if(checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false; // He был освобожден
}
}
В этой упрощенной форме конструктор использует newInstance() для заполнения пула объектами. Если вам понадобится новый объект, вызовите checkOut(); завершив работу с объектом, передайте его checkIn().
Логический массив checkedOut отслеживает выданные объекты. Для управления его содержимым используются методы getItem() и releaseItem(). В свою очередь, эти методы защищены семафором available, поэтому в checkOut() семафор available блокирует дальнейшее выполнение при отсутствии семафорных разрешений (то есть при отсутствии объектов в пуле). Метод checkIn() проверяет действительность возвращаемого объекта, и, если объект действителен, разрешение возвращается семафору.
Для примера мы воспользуемся классом Fat. Создание объектов этого класса является высокозатратной операцией, а на выполнение конструктора уходит много времени:
//: concurrency/Fat.java
// Объекты, создание которых занимает много времени.
public class Fat {
private volatile double d; // Предотвращает оптимизацию
private static int counter = 0;
private final int id = counter++;
public Fat() {
// Затратная, прервываемая операция:
for(int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void operation() { System.out.println(this); }
public String toString() { return "Fat id: " + id; }
}
Мы создадим пул объектов Fat, чтобы свести к минимуму затраты на выполнение конструктора. Для тестирования класса Pool будет создана задача, которая забирает объекты Fat для использования, удерживает их в течение некоторого времени, а затем возвращает обратно:
//: concurrency/SemaphoreDemo.java
// Тестирование класса Pool
import java.util.concurrent.*;
import java.util.*;
import static net.mindview.util.Print.*;
// Задача для получения ресурса из пула:
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
public void run() {
try {
T item = pool.checkOut();
print(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
print(this +"checking in " + item);
pool.checkIn(item);
} catch(InterruptedException e) {
// Приемлемый способ завершения
}
}
public String toString() {
return "CheckoutTask " + id + " ";
}
}
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception {
final Pool<Fat> pool =
new Pool<Fat>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < SIZE; i++)
exec.execute(new CheckoutTask<Fat>(pool));
print("All CheckoutTasks created");
List<Fat> list = new ArrayList<Fat>();
for(int i = 0; i < SIZE; i++) {
Fat f = pool.checkOut();
printnb(i + ": main() thread checked out ");
f.operation();
list.add(f);
}
Future<?> blocked = exec.submit(new Runnable() {
public void run() {
try {
// Семафор предотвращает лишний вызов checkout.
// поэтому следующий вызов блокируется:
pool.checkOut();
} catch(InterruptedException e) {
print("checkOut() Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true); // Выход из заблокированного вызова
print("Checking in objects in " + list);
for(Fat f : list)
pool.checkIn(f);
for(Fat f : list)
pool.checkIn(f); // Второй вызов checkIn игнорируется
exec.shutdown();
}
} /* (Execute to see output) *///:~
В коде main() создается объект Pool для хранения объектов Fat, после чего группа задач CheckoutTask начинает использовать Pool. Далее поток main() начинает выдавать объекты Fat, не возвращая их обратно. После того как все объекты пула будут выданы, семафор запрещает дальнейшие выдачи. Метод run() блокируется, и через две секунды вызывается метод cancel(). Лишние возвраты Pool игнорирует.
Exchanger
Класс Exchanger представляет собой «барьер», который меняет местами объекты двух задач. На подходе к барьеру задачи имеют один объект, а на выходе — объект, ранее удерживавшийся другой задачей. Объекты Exchanger обычно используются в тех ситуациях, когда одна задача создает высокозатратные объекты, а другая задача эти объекты потребляет.
Чтобы опробовать на практике класс Exchanger, мы создадим задачу-поставщика и задачу-потребителя, которые благодаря параметризации и генераторам могут работать с объектами любого типа. Затем эти параметризованные задачи будут применены к классу Fat. ExchangerProducer и ExchangerConsumer меняют местами List<T>; при вызове метода Exchanger.exchange() вызов блокируется до тех пор, пока парная задача не вызовет свой метод exchange(), после чего оба метода exchange() завершаются, а контейнеры List<T> меняются местами:
//: concurrency/ExchangerDemo.java
import java.util.concurrent.*;
import java.util.*;
import net.mindview.util.*;
class ExchangerProducer<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
ExchangerProducer(Exchanger<List<T>> exchg,
Generator<T> gen, List<T> holder) {
exchanger = exchg;
generator = gen;
this.holder = holder;
}
public void run() {
try {
while(!Thread.interrupted()) {
for(int i = 0; i < ExchangerDemo.size; i++)
holder.add(generator.next());
// Заполненный контейнер заменяется пустым::
holder = exchanger.exchange(holder);
}
} catch(InterruptedException e) {
// Приемлемый способ завершения.
}
}
}
class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){
exchanger = ex;
this.holder = holder;
}
public void run() {
try {
while(!Thread.interrupted()) {
holder = exchanger.exchange(holder);
for(T x : holder) {
value = x; // Выборка значения
holder.remove(x); // Нормально для CopyOnWriteArrayList
}
}
} catch(InterruptedException e) {
// Приемлемый способ завершения.
}
System.out.println("Final value: " + value);
}
}
public class ExchangerDemo {
static int size = 10;
static int delay = 5; // Секунды
public static void main(String[] args) throws Exception {
if(args.length > 0)
size = new Integer(args[0]);
if(args.length > 1)
delay = new Integer(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
List<Fat>
producerList = new CopyOnWriteArrayList<Fat>(),
consumerList = new CopyOnWriteArrayList<Fat>();
exec.execute(new ExchangerProducer<Fat>(xc,
BasicGenerator.create(Fat.class), producerList));
exec.execute(
new ExchangerConsumer<Fat>(xc,consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}
<spoiler text="Output:"> (Sample)
Final value: Fat id: 29999
</spoiler>
В методе main() для обеих задач создается один объект Exchanger, а для перестановки создаются два контейнера CopyOnWriteArrayList. Эта разновидность List нормально переносит вызов метода remove() при перемещении по списку, не выдавая исключения ConcurrentModificationException.
ExchangerProducer заполняет список, а затем меняет местами заполненный список с пустым, передаваемым от ExchangerConsumer. Благодаря Exchanger заполнение списка происходит одновременно с использованием уже заполненного списка.
Моделирование
Одна из самых интересных областей применения параллельных вычислений — всевозможные имитации и моделирование. Каждый компонент модели оформляется в виде отдельной задачи, что значительно упрощает его программирование.
Примеры HorseRace.java и GreenhouseScheduler.java, приведенные ранее, тоже можно считать своего рода имитаторами.
Модель кассира
В этой классической модели объекты появляются случайным образом и обслуживаются за случайное время ограниченным количеством серверов. Моделирование позволяет определить идеальное количество серверов. Продолжительность обслуживания в следующей модели зависит от клиента и определяется случайным образом. Вдобавок мы не знаем, сколько новых клиентов будет прибывать за каждый период времени, поэтому эта величина тоже определяется случайным образом.
//: concurrency/BankTellerSimulation.java
// Пример использования очередей и многопоточного программирования..
// {Args: 5}
import java.util.concurrent.*;
import java.util.*;
// Объекты, доступные только для чтения, не требуют синхронизации:
class Customer {
private final int serviceTime;
public Customer(int tm) { serviceTime = tm; }
public int getServiceTime() { return serviceTime; }
public String toString() {
return "[" + serviceTime + "]";
}
}
// Очередь клиентов умеет выводить информацию о своем состоянии:
class CustomerLine extends ArrayBlockingQueue<Customer> {
public CustomerLine(int maxLineSize) {
super(maxLineSize);
}
public String toString() {
if(this.size() == 0)
return "[Empty]";
StringBuilder result = new StringBuilder();
for(Customer customer : this)
result.append(customer);
return result.toString();
}
}
// Случайное добавление клиентов в очередь:
class CustomerGenerator implements Runnable {
private CustomerLine customers;
private static Random rand = new Random(47);
public CustomerGenerator(CustomerLine cq) {
customers = cq;
}
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(300));
customers.put(new Customer(rand.nextInt(1000)));
}
} catch(InterruptedException e) {
System.out.println("CustomerGenerator interrupted");
}
System.out.println("CustomerGenerator terminating");
}
}
class Teller implements Runnable, Comparable<Teller> {
private static int counter = 0;
private final int id = counter++;
// Счетчик клиентов, обслуженных за текущую смену:
private int customersServed = 0;
private CustomerLine customers;
private boolean servingCustomerLine = true;
public Teller(CustomerLine cq) { customers = cq; }
public void run() {
try {
while(!Thread.interrupted()) {
Customer customer = customers.take();
TimeUnit.MILLISECONDS.sleep(
customer.getServiceTime());
synchronized(this) {
customersServed++;
while(!servingCustomerLine)
wait();
}
}
} catch(InterruptedException e) {
System.out.println(this + "interrupted");
}
System.out.println(this + "terminating");
}
public synchronized void doSomethingElse() {
customersServed = 0;
servingCustomerLine = false;
}
public synchronized void serveCustomerLine() {
assert !servingCustomerLine:"quot;already serving: " + this;
servingCustomerLine = true;
notifyAll();
}
public String toString() { return "Teller " + id + " "; }
public String shortString() { return "T" + id; }
// Используется приоритетной очередью:
public synchronized int compareTo(Teller other) {
return customersServed < other.customersServed ? -1 :
(customersServed == other.customersServed ? 0 : 1);
}
}
class TellerManager implements Runnable {
private ExecutorService exec;
private CustomerLine customers;
private PriorityQueue<Teller> workingTellers =
new PriorityQueue<Teller>();
private Queue<Teller> tellersDoingOtherThings =
new LinkedList<Teller>();
private int adjustmentPeriod;
private static Random rand = new Random(47);
public TellerManager(ExecutorService e,
CustomerLine customers, int adjustmentPeriod) {
exec = e;
this.customers = customers;
this.adjustmentPeriod = adjustmentPeriod;
// Начинаем с одного кассира:
Teller teller = new Teller(customers);
exec.execute(teller);
workingTellers.add(teller);
}
public void adjustTellerNumber() {
// Фактически это система управления. Регулировка числовых
// параметров позволяет выявить проблемы стабильности
// в механизме управления.
// Если очередь слишком длинна, добавить другого кассира:
if(customers.size() / workingTellers.size() > 2) {
// Если кассиры отдыхают или заняты
// другими делами, вернуть одного из них:
if(tellersDoingOtherThings.size() > 0) {
Teller teller = tellersDoingOtherThings.remove();
teller.serveCustomerLine();
workingTellers.offer(teller);
return;
}
// Иначе создаем (нанимаем) нового кассира
Teller teller = new Teller(customers);
exec.execute(teller);
workingTellers.add(teller);
return;
}
// Если очередь достаточно коротка, освободить кассира:
if(workingTellers.size() > 1 &&
customers.size() / workingTellers.size() < 2)
reassignOneTeller();
// Если очереди нет. достаточно одного кассира:
if(customers.size() == 0)
while(workingTellers.size() > 1)
reassignOneTeller();
}
// Поручаем кассиру другую работу или отправляем его отдыхать:
private void reassignOneTeller() {
Teller teller = workingTellers.poll();
teller.doSomethingElse();
tellersDoingOtherThings.offer(teller);
}
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(adjustmentPeriod);
adjustTellerNumber();
System.out.print(customers + " { ");
for(Teller teller : workingTellers)
System.out.print(teller.shortString() + " ");
System.out.println("}");
}
} catch(InterruptedException e) {
System.out.println(this + "interrupted");
}
System.out.println(this + "terminating");
}
public String toString() { return "TellerManager "; }
}
public class BankTellerSimulation {
static final int MAX_LINE_SIZE = 50;
static final int ADJUSTMENT_PERIOD = 1000;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// Если очередь слишком длинна, клиенты уходят:
CustomerLine customers =
new CustomerLine(MAX_LINE_SIZE);
exec.execute(new CustomerGenerator(customers));
// TellerManager добавляет и убирает кассиров
// по мере необходимости:
exec.execute(new TellerManager(
exec, customers, ADJUSTMENT_PERIOD));
if(args.length > 0) // Необязательный аргумент
TimeUnit.SECONDS.sleep(new Integer(args[0]));
else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();
}
}
<spoiler text="Output:"> (Sample)
[429][200][207] { T0 T1 }
[861][258][140][322] { T0 T1 }
[575][342][804][826][896][984] { T0 T1 T2 }
[984][810][141][12][689][992][976][368][395][354] { T0 T1 T2 T3 }
Teller 2 interrupted
Teller 2 terminating
Teller 1 interrupted
Teller 1 terminating
TellerManager interrupted
TellerManager terminating
Teller 3 interrupted
Teller 3 terminating
Teller 0 interrupted
Teller 0 terminating
CustomerGenerator interrupted
CustomerGenerator terminating
</spoiler>
Объекты Customer очень просты; они содержат только поле данных final int. Так как эти объекты никогда не изменяют своего состояния, они являются объектами, доступными только для чтения, и поэтому требуют синхронизации или использования volatile. Вдобавок каждая задача Teller удаляет из очереди ввода только один объект Customer и работает с ним до завершения, поэтому задачи все равно будут работать с Customer последовательно.
Класс CustomerLine представляет собой общую очередь, в которой клиенты ожидают обслуживания. Он реализован в виде очереди ArrayBlockingQueue с методом toString(), который выводит результаты в желаемом формате. Генератор CustomerGenerator присоединяется к CustomerLine и ставит объекты Customer в очередь со случайными интервалами.
Teller извлекает клиентов Customer из CustomerLine и обрабатывает их последовательно, подсчитывая количество клиентов, обслуженных за текущую смену. Если клиентов не хватает, его можно перевести на другую работу (doSomethingElse()), а при появлении большого количества клиентов — снова вернуть на обслуживание очереди методом serveCustomerLine(). Чтобы приказать следующему кассиру вернуться к очереди, метод compareTo() проверяет количество обслуженных клиентов, чтобы приоритетная очередь автоматически ставила в начало кассира, работавшего меньше других.
Вся основная деятельность выполняется в TellerManager. Этот класс следит за всеми кассирами и за тем, что происходит с клиентами. Одна из интересных особенностей данной имитации заключается в том, что она пытается подобрать оптимальное количество кассиров для заданного потока покупателей. Пример встречается в методе adjustTellerNumber() — управляющей системе для надежной, стабильной регулировки количества кассиров. У всех управляющих систем в той или иной мере присутствуют проблемы со стабильностью; слишком быстрая реакция на изменения снижает стабильность, а слишком медленная переводит систему в одно из крайних состояний.
Резюме
В этой главе я постарался изложить основы многопоточного программирования с использованием потоков Java. Прочитав ее, читатель должен понять следующее:
- Программу можно разделить на несколько независимых задач.
- Необходимо заранее предусмотреть всевозможные проблемы, возникающие при завершении задач.
- Задачи, работающие с общими ресурсами, могут мешать друг другу. Основным средством предотвращения конфликтов является блокировка.
- В неаккуратно спроектированных многозадачных системах возможны взаимные блокировки.
Очень важно понимать, когда рационально использовать параллельное выполнение, а когда этого делать не стоит. Основные причины для его использования:
- управление несколькими подзадачами, одновременное выполнение которых позволяет эффективнее распоряжаться ресурсами компьютера (включая возможность незаметного распределения этих задач по нескольким процессорам);
- улучшенная организация кода;
- удобство для пользователя.
Классический пример распределения ресурсов — использование процессора во время ожидания завершения операций ввода/вывода. Классический пример чуткого пользовательского интерфейса — отслеживание нажатий кнопки «Прервать» во время продолжительного процесса загрузки.
Дополнительным преимуществом потоков является то, что они заменяют «тяжелое» переключение контекста процессов (порядка 1000 и более инструкций) «легким» переключением контекста выполнения (около 100 инструкций). Так как все потоки процесса разделяют одно и то же пространство памяти, легкое переключение затрагивает только выполнение программы и локальные переменные. С другой стороны, чередование процессов — тяжелое переключение контекста — требует обновления всего пространства памяти.
Основные недостатки многозадачности:
- Замедление программы, связанное с ожиданием освобождения блокированных ресурсов.
- Дополнительная нагрузка на процессор для управления потоками.
- Совершенно ненужная сложность, являющаяся следствием неудачных решений при проектировании программы.
- Аномальные ситуации: взаимные блокировки, конфликты доступа, гонки и т. д.
- Непоследовательное поведение на различных платформах. Например, при разработке некоторых примеров для данной книги я обнаружил ситуации гонки, быстро проявлявшиеся на некоторых компьютерах, но незаметные на других.
Пожалуй, основные трудности с потоками возникают тогда, когда несколько потоков одновременно пытаются использовать один и тот же ресурс — например, память объекта, и вы должны сделать так, чтобы этого ни в коем случае не произошло. Для этого нужно разумно использовать ключевое слово synchronized — полезный инструмент языка, который тем не менее необходимо хорошо понимать (без этого в программе может незаметно возникнуть опасность взаимной блокировки).
Вдобавок многозадачное программирование сродни искусству. Язык Java существует для того, чтобы вы могли свободно создавать столько объектов, сколько вам нужно для решения вашей задачи — по крайней мере, в теории это так. (Например, создание миллионов объектов для проведения проекционно-разностного анализа вряд ли будет иметь смысл в Java.) Однако оказывается, что количество потоков упирается в определенный «потолок», так как после превышения этой-границы потоки становятся неподатливыми. Это критическое число трудно определить, зачастую оно зависит от операционной системы и виртуальной машины Java, значение может находиться где-то в районе сотни, а может исчисляться тысячами. Если для решения своей задачи вам требуется небольшая группа потоков, это ограничение не актуально, но при разработке больших программ оно может создать затруднения.
------------------------
ТРИО теплый пол отзыв
Заработок на сокращении ссылок
Earnings on reducing links
Код PHP на HTML сайты
Категория: Книги по Java
Комментарии |