Threads und C++11

Nachdem ich ziemlich lange berufsbedingt Ausflüge in so bizarre Welten wie die von PHP, Perl oder gar Javascript machen musste, habe ich mir nun die Zeit genommen, endlich mal wieder nach C++ zu schauen. Mit gcc 4.7 lässt sich ja schon vieles von C++11 ausprobieren. Mich hat besonders der Stand des Multithreading interessiert. Als Beispiel muss (wenig überraschend) mal wieder ein Consumer/Producer-Problem herhalten,
hier produziert der Producer einfach Integer und die Consumer filter daraus die Primzahlen. Da bei uns momentan eher noch gcc 4.5 verbreitet ist, habe ich unter Verzicht auf einige Schmankerln den Code dafür kompilierbar gehalten.

Nun aber erstmal der Quellcode:

#include <vector>
#include <iostream>
#include <queue>
#include <thread>
#include <future>
#include <algorithm>

// data type used for prime numbers
typedef unsigned int prime_t;

bool is_prime(prime_t value)
{
  for (prime_t i = 2; i < value; ++i) {
    if (value % i == 0) {
      return false;
    }
  }
  return true;
}

// signal a consumer that it is ready
class QueueReady : public std::logic_error
{
public:
  QueueReady() : std::logic_error("producer ready") {}
};

class Queue
{
  std::queue items;
  static const size_t MAX_SIZE = 100000;
  bool producerDone; // flag set by producer if finished
  std::mutex m;
  std::condition_variable cnfull, cnempty;
public:
  Queue() : producerDone(false) {
  }
  void put(prime_t value) {
    std::unique_lock lock(m);
    while (items.size() >= MAX_SIZE) cnfull.wait(lock);
    items.push(value);
    cnempty.notify_all();
  }
  prime_t get() {
    std::unique_lock lock(m);
    while (items.empty()) {
      if (producerDone) throw QueueReady();
      cnempty.wait(lock);
    }
    prime_t v = items.front();
    items.pop();
    cnfull.notify_all();
    return v;
  }
  void setProducerReady() {
    std::unique_lock lock(m);
    producerDone = true;
  }
};

class Producer
{
  Queue& items;
  prime_t bound;
public:
  Producer(Queue& q, prime_t s) : items(q), bound(s) {}
  void operator()() {
    std::cout << "producer started\n";
    for(prime_t i = 2;i<bound;++i) {
      items.put(i);
    }
    items.setProducerReady();
    std::cout << "producer ready\n";
  }
};

class Consumer
{
  Queue& items;
  std::vector primes;
public:
  Consumer(Queue& q) : items(q) {}
  size_t operator()() {
    std::cout << "consumer started\n";
    while (1) {
      try {
        prime_t v = items.get();
        if (is_prime(v)) {
          primes.push_back(v);
        }
      }
      catch (QueueReady&) {
        std::cout << "consumer ready: " << primes.size() << " numbers\n";
        return primes.size();
      }
    }
  }
};

void usage()
{
  std::cerr << "./primeseq  \n"
            << "  bound: max number to check for prime (>2)\n"
            << "  threads: number of consumer threads\n"
            ;
  exit(0);
}

int main(int argc, char *argv[])
{
  if (argc != 3) usage();
  int bound = atoi(argv[1]);
  if (bound < 3) usage();
  int conc = atoi(argv[2]);
  if (conc < 1) usage();
  Queue q;
  typedef std::vector<std::future> cons_t;
  cons_t cons;
  for (int i=0;i<conc;++i) cons.push_back(std::async(std::launch::async,(Consumer(q))));
  std::thread prod((Producer(q,bound)));

  prod.join();
  size_t count = 0;
  std::for_each(cons.begin(),cons.end(),[&count](cons_t::value_type& i){ count += i.get(); });
  std::cout << "found " << count << " primes\n";
}

Der Beginn ist wohl einfach: an Headern brauchen wir vector und queue fuer genau diese Container,
iostream fuer ein paar Ausgaben, algorithm fuer for_each und thread und future wollen wir uns ja
genauer anschauen.
In Zeile 9 folgt ein typedef (ich habe es auch mal mit unsigned long long laufen lassen).
Zeile 11ff definiert eine ganz simple Funktion um zu testen, ob eine Zeile Primzahl ist.
Bis dahin nichts Neues, auch die Exception in Zeile 22ff ist nicht neu.

Und dann geht es los: Producer und Consumer kommunizieren ueber eine Queue, diese definiert die Klasse in den Zeilen 28ff. Wir greifen auf std::queue zurueck, Zugriff werden durch einen Mutex (Zeile 33) geschuetzt. Die Groesse der Queue haben wir beschraenkt (31). Da das Ganze nicht endlos laufen soll, ist der Producer ja irgendwann fertig, dieses Ereignis merken wir uns (32). Und schliesslich kommunizieren alle Beteiligten ueber die ueblichen Condition-Variablen (34). Die Queue wird ueber Aufruf von put (38ff) gefuellt und per get (44ff) geleert. Einzig bemerkenswert hier ist die get-Funktion: da wir nicht endlos laufen wollen, muss diese Funktion dem Aufrufer mitteilen, wann er aufhoeren kann. Dazu benutzen wir hier die Exception aus Zeile 22ff. Der Producer informiert die Queue ueber sein Arbeitsende per setProducerReady (55ff).
In Zeile 61ff definieren wir den Producer, der Einfachheit halber ist hier der function-call-operator ueberladen – das macht die Thread-Erzeugung nachher einfacher, ist aber nicht zwingend.
In Zeile 77ff folgt der Consumer, hier wird die QueueReady-Exception gefunden und fuer zum Taskende. Wieder ist der function-call-operator ueberladen (83ff) und liefert die Anzahl der gefunden Primzahlen zurueck.
Zeile 100ff definiert etwas Luxus, genauso wie die ersten Zeile in main(). Die eigentliche Arbeit wird in den paar restlichen Zeilen verrichtet und ist verblueffend einfach: wir erzeugen eine Instanz der Queue (116). In Zeile 119 starten wir die Consumer-Threads per std::async. Diese Funktion bekommt eine Policy und “irgendwas, was man aufrufen kann” (callable); und liefert ein std::future zurueck. Das future kapselt eine asynchrone Berechnung, die laeuft halt irgendwann ab. Erst beim Zugriff auf das Ergebnis per future::get wird sichergestellt, dass die Berechnung auch fertig ist. Future::get liefert als Ergebnis den Wert, welchen das callable liefert. In unserem Falle also die Anzahl der von diesem Thread gefundenen Primzahlen.
Im Prinzip macht std::async implizit genau das, was wir dann explizit mit dem Producer machen: wir erzeugen einen Thread (120) und warten spaeter auf dessen Beendigung (122). Die Zeilen 123f machen etwas für C++11 wohl sehr typisches (jedenfalls etwas das mir sehr gut gefaellt): sie iterieren per for_each ueber einen Container und lassen eine Berechnung per lamda-Funktion auf jedes Element los. Wenn auch die Syntax fuer die lamdas wohl mehr als nur gewoehnungsbeduerftig ist, ist das ein echter Gewinn an Lesbarkeit (nach Gewoehnung versteht sich). std::foreach(cons.begin(),cons.end(),…) ist an sich wohl klar. Neu ist

[&count](cons_t::value_type& i){ count += i.get(); }

Wir haben hier die 3 Teile jedes Lambdas:

  1. [] definiert den Zugriff auf den umgebenden Scope, &count meint wir greifen per Referenz auf diese Variable zu
  2. () definiert die Parameterliste, std::for_each ruft die Funktion ja jeweils pro Item auf, das typedef in Zeile 117 hilft uns bei der Typangabe
  3. {} definiert den Body, hier summieren wir ueber die Referenz auf count

Die Lambda-Funktion bekommt ja als Parameter ein future. Dessen future::get Funktion rufen wir auf. Im Prinzip wird hier ein thread::join gemacht und dann der Return-Wert der Thread-Funktion (hier Zeile 83ff) zurueckgeliefert.

Nach 124 steht in count also die Summe der von unseren Threads berechneten Primzahlen. Ich habe das Ganze beim Testen als Kontrolle verwendet – wenn der Aufruf mit 1 Thread eine andere Summe als der Aufruf mit 16 Threads liefert, dann stimmt wohl irgendetwas nicht.

Uebersetzt werden kann das Ganze dann mit
g++ -std=c++0x -Wall -g -pthread -o primeseq primeseq.cpp
Die Option -std=c++0x enabled die neuen Features, -pthread informiert den gcc, dass wir gerne Threads haetten.

Alles in allem erstaunlich einfach und fuer mich (von pthread kommend) keine grosse Umstellung: die Pthread-Implementierung sieht bis Zeile 116 nahezu identisch aus.
Und die Lambdas – so haesslich sie auch aussehen (syntaktisch) – ich fange an sie zu lieben.