A konkurrens programozásban a monitor szinkronizációs eszköz, ami kombinálja a kölcsönös kizárást és egy feltétel teljesülésére való várakozást. A monitorok tudnak jelezni a szálaknak, ha a feltétel teljesült. A monitor tartalmaz egy zárobjektumot és feltételváltozókat. A feltételváltozó szálakat tartalmaz, amelyek a feltétel teljesülésére várnak. Mechanizmusokat biztosítanak a szálaknak, hogy ideiglenesen feladják kizárólagos hozzáférésüket, amíg nem teljesülnek bizonyos feltételek, mielőtt visszaszerzik a kizárólagos hozzáférésüket és befejezhetik feladatukat.
Egy másik definíció szerint a monitor szálbiztos osztály, objektum vagy modul, ami magába csomagolja a kölcsönös kizárást, hogy egyetlen szálnál több is hozzáférjen egy változóhoz vagy metódushoz. Definiáló jellemzője, hogy metódusai csak kölcsönös kizárással hajthatók végre, minden metódusát egyszerre csak egy szál hajthatja végre. Feltételváltozók biztosítják, hogy a szálak csak bizonyos feltételek teljesülése esetén férhessenek az objektumhoz. A továbbiakban ekként hivatkozunk majd a monitorra.
Azt mondjuk, hogy amíg egy szál végrehajtja a monitor műveleteit, addig elfoglalja az objektumot, birtokolva annak zárját. A szálbiztos objektumok úgy vannak megvalósítva, hogy kikényszerítsék azt az elvet, hogy az objektumot egyszerre csak egy szál foglalhatja el. A zár kezdetben nyitva van. Publikus metódus hívása zárja, visszatérése kinyitja.
A metódusok egyikének meghívásakor a szálnak várnia kell, amíg az objektum fel nem szabadul, mielőtt meghívhaná a metódust. Kölcsönös kizárás nélkül pénz tűnhet el vagy bukkanhat fel a semmiből. Például két szál 1000 egységet vonna le, és igazat kapna vissza, de a levonás csak egyszer lenne elvégezve:,
Mindkét szál lekéri az egyenleget.
Elvégzik az összehasonlítást.
Elvégzik a kivonást.
Az egyik szál beállítja az egyenleget.
A másik szál beállítja az egyenleget.
A fenti "monitor class" egy szintaktikus cukor, amivel a fenti osztály a következőnek felel meg:
Néha nem elég a kölcsönös kizárás, feltétel is kell. A várakozás legnaivabb módja a busy waiting, aminek pszeudokódja:
whilenot( P ) doskip
Ez azonban akadályozza, hogy a többi szál továbbhaladjon. Az akadályoztatás függ az ütemezőtől, a round robint nem érinti, viszont a FIFO és a hasonló ütemezők esetén a program holtpontba kerül. Ez rontja a hordozhatóságot. Ennek egy másik változata az, hogy csak bizonyos időnként ellenőrzi a feltételt, ezt az időt azonban nem egyszerű beállítani, és szintén hordozhatósági problémát okozhat. Ha túl rövid, akkor a várakozók túl sokat foglalják a processzort, megnövelik a válaszadási időt. Ha túl nagy, akkor szükségtelenül hosszú ideig várakozhatnak, ami megint megnöveli a válaszadási időt.
Ehelyett inkább jelzést küldenek a várakozó szálaknak, ha feltétel (valószínűleg) igaz.
Példa: Termelő-fogyasztó minta
Egy klasszikus példa a termelő-fogyasztó minta, korlátos bufferral. A buffer sor vagy gyűrű buffer egy adott maximális mérettel. A példában úgy kezeljük, hogy a termelő feladatokat helyez a bufferba, a fogyasztó pedig kiveszi őket, és végrehajtja. A sort magát nem tekintjük szálbiztosnak, és lehet üres, részben töltött vagy teli. Ha teli van, akkor a termelőnek várnia kell a fogyasztóra; ha üres, akkor a fogyasztónak kell várnia a termelőre.
Mivel a tár nem tekinthető szálbiztosnak, gondoskodni kell védelméről. Mivel a kontextusváltások bármikor megtörténhetnek, lehet, hogy a buffer éppen inkonzisztens állapotban van. Emiatt minden szálban kritikus szakasszá kell tenni azt a kódrészletet, ami hozzáfér a bufferhoz, és ezt kölcsönös kizárással szinkronizálni kell. Szinkronizáció nélkül versenyhelyzet adódhat a buffer állapota és a tetszőleges összefésülés miatt.
Inkorrekt megoldás szinkronizáció nélkül
Egy naiv megoldás a busy-waitingre való hagyatkozással készül, szinkronizáció nélkül:
globalRingBufferqueue;// A thread-unsafe ring-buffer of tasks.// Method representing each producer thread's behavior:publicmethodproducer(){while(true){taskmyTask=...;// Producer makes some new task to be added.while(queue.isFull()){}// Busy-wait until the queue is non-full.queue.enqueue(myTask);// Add the task to the queue.}}// Method representing each consumer thread's behavior:publicmethodconsumer(){while(true){while(queue.isEmpty()){}// Busy-wait until the queue is non-empty.myTask=queue.dequeue();// Take a task off of the queue.doStuff(myTask);// Go off and do something with the task.}}
Ennek az a problémája, hogy betétel és kivétel közben elvehetik a vezérlést a száltól, és a másiknak adhatják. A queue.enqueue és queue.dequeue metódusok nem atomiak, félbeszakításuk azt eredményezheti, hogy a sor hossza nem az lesz, mint ami a változókban van. Mivel a queue.isEmpty() és a queue.isFull() függvények ez alapján döntenek, ez hibás döntéseket eredményez a szálaknál, amitől a program összeomlik, vagy nem definiált módon kezd el működni, beláthatatlan következményekkel.
Busy waiting
A busy waiting a szinkronizáció egyik naiv megközelítése. Itt a kritikus szakaszt mutex védi, a zárat pedig minden ciklus megszerzi és elengedi ellenőrzés céljára.
globalRingBufferqueue;// A thread-unsafe ring-buffer of tasks.globalLockqueueLock;// A mutex for the ring-buffer of tasks.// Method representing each producer thread's behavior:publicmethodproducer(){while(true){taskmyTask=...;// Producer makes some new task to be added.queueLock.acquire();// Acquire lock for initial busy-wait check.while(queue.isFull()){// Busy-wait until the queue is non-full.queueLock.release();// Drop the lock temporarily to allow a chance for other threads// needing queueLock to run so that a consumer might take a task.queueLock.acquire();// Re-acquire the lock for the next call to "queue.isFull()".}queue.enqueue(myTask);// Add the task to the queue.queueLock.release();// Drop the queue lock until we need it again to add the next task.}}// Method representing each consumer thread's behavior:publicmethodconsumer(){while(true){queueLock.acquire();// Acquire lock for initial busy-wait check.while(queue.isEmpty()){// Busy-wait until the queue is non-empty.queueLock.release();// Drop the lock temporarily to allow a chance for other threads// needing queueLock to run so that a producer might add a task.queueLock.acquire();// Re-acquire the lock for the next call to "queue.isEmpty()".}myTask=queue.dequeue();// Take a task off of the queue.queueLock.release();// Drop the queue lock until we need it again to take off the next task.doStuff(myTask);// Go off and do something with the task.}}
Ez kizárja a nem megengedett állapotokat, de pazarolja az erőforrásokat. A zár megszerzése és elengedése akadályozza a többi szál munkáját, azaz a feltétel teljesülését. Ha teli van a sor, akkor a termelők, ha üres, akkor a fogyasztók foglalják a processzoridőt érdemi munka nélkül. Kevésbé erőforrás-igényes, ha a kérdezgetés helyett ezeket a szálakat megvárakoztatják, és szólnak nekik, ha a feltétel teljesül.
Ehhez járul, hogy a mutexek is lehetnek busy waitinggel megvalósítva, úgyhogy ezzel beszorzódna az erőforrások pazarlása.
Feltételváltozók bevezetése
A megoldás a feltételváltozók bevezetése. A feltételváltozók tartalmaznak egy feltételt, egy monitor objektumot és egy várakozási sort. A várakozási sorban levő szálak még nem foglalják el a monitort, így egy másik szál beléphet egy másik feltétellel, és megváltoztathatja a monitor állapotát. Ha ezzel egy másik feltétel igazzá vált, akkor a monitor vagy a szál igazzá teszi a megfelelő feltételváltozót.
A feltételváltozókon három alapművelet van:
wait c, m, ahol c feltételváltozó, m pedig a monitor zárja. Ezt az a szál hívja, ami a monitorra vár, és a neki megfelelő állapotot a c feltételváltozó jelzi. A várakozás alatt nem foglalja el a monitort. A következő lépésekből áll:
Atomosan:
elengedi a mutexet,
a szálat áthelyezi a futók közül a várakozók közé, és elaltatja.
Ha értesítést kapnak a szálak, akkor ez ébred fel először.
Az első két lépés sorrendje tetszőleges, a harmadik ezután bármikor bekövetkezhet. A szál a wait művelet közben elalszik, és amikor felébred, a wait műveletben találja magát. Az első két lépés atomivá való összefogása a versenyhelyzet elkerülését szolgálja. Ennek egyik hibája az elmulasztott ébresztés lenne, amikor az ütemező éppen elalvás előtt venné el a vezérlést a száltól. Az nem kapná meg az értesítést, mivel még nem alszik, majd aludni menne, amiből már nem ébresztenék fel. Lehetségesek további versenyhelyzetek is a két lépés sorrendjétől és a kontextusváltástól függően.
signal c más néven notify c, ezzel jelzi a szál vagy a monitor, hogy a c feltételváltozó igazra váltott. Ez implementációfüggően egy vagy több várakozó szálat ébreszt fel, amelyek közül valamelyik elfoglalhatja a monitort. Ajánlott a monitor elengedése előtt kiadni, de a szál utána is kiadhatja, ha a kódot megfelelően tervezték a konkurrens programozáshoz és a szálak megvalósítása is megfelelő, ugyanis a két művelet sorrendje hathat a prioritásokra. Az API- dokumentációjának részletesen kell ezzel foglalkoznia.
A broadcast c, más néven notifyAll c a signal c-hez hasonló művelet, de ez az összes c feltételváltozóra várakozó szálat felébreszti. Ezzel a várakozósor kiürül, de amelyik szál nem tudott belépni, az vissza is kerül. Előfordulhat, hogy erre van szükség, mivel a rossz feltételre várakozó szál felébredhet, de azonnal vissza is mehet aludni, anélkül hogy értesítené a jó feltételre várakozó szálat. Egyébként lehet, hogy a signal c hatékonyabb.
Egy monitorhoz tartozhat több feltételváltozó, de egy feltételváltozót több monitorhoz használni rossz tervezési minta. Ekkor ugyanis a két monitor objektum nem használható egymástól függetlenül. A feltételváltozók szerint a szálak csoportokba tartoznak, például termelők és fogyasztók. A blokkolja a termelőket, míg a fogyasztóknak arra kell várniuk, hogy ne teljesüljön. Az egyik csoport egy szála lefutása után a másik csoportból valaki hozzáférhet a monitorhoz, hiszen a termelő betett egy elemet, vagy egy fogyasztó eltávolított egyet, ezért van elvehető elem, vagy hely az elemnek. A monitornak lehetnek olyan műveletei is, amelyeknek nincs előfeltétele.
A monitor használata
A monitor használatának egyszerűsített leírása:
acquire(m);// A monitor zárjának megszerzése.while(!p){// Várunk, amíag nem teljesül a feltétel...wait(m,cv);// A monitor zárján és feltételváltozóján várunk.}// ... Kritikus szakasz ...signal(cv2);--VAGY--notifyAll(cv2);// cv2 egyezhet cv-vel, de különbözhet is tőle.release(m);// A monitor zárjának elengedése
Bővebb kommentekkel:
// ... (bevezető szakasz)// A monitor zárjának megszerzése.// A konkurrens megosztott adatokhoz társított mutex (zár) megszerzése, //ami biztosítja, hogy a szálak ne legyenek preemptíven összefésülve vagy különböző magokon futtatva úgy, hogy // a kritikus szakaszok végrehajtása közben ugyanazokat az adatokat írják illetve olvassák.// Ha egy másik szál birtokolja ezt a mutexet, akkor a szál bekerül// az m mutex várakozási sorába, ahol ''m'' nem lehet olyan, hogy busy waitinggel kell rá várni.acquire(m);// A zár birtokában most már lehet ellenőrizni a feltételt:while(!p())// "p" kifejezés (lehet változó vagy függvényhívás) ami ellenőrzi a feltételt// és logikai értéket ad. Ez már a kritikus szakasz, tehát *KELL*// hozzá a zár!// Ha a szál nem először ellenőrzi a feltételt, akkor a kérdés:// "Most, hogy egy másik szál értesített, miután használta a monitort, és felébresztett, és megkaptam a // vezérlést, még mindig igaz-e a feltétel, vagy hamissá vált-e egy másik szál tevékenysége után?//{// Ha ez az első iteráció, akkor a válasz "nem" -- a feltétel még nem teljesül.// Egyébként a válasz: az utóbbi; egy másik szál újra hamissá tette a feltételt, hiába ébredt fel a szál,// ismét várnia kell.wait(m,cv);// Ideiglenesen megakadályozza a többi szálat, hogy műveletet hajtson végre az m mutexen vagy a cv változón.// release(m) // Atomosan elengedi "m" zárját, így egy másik szál hozzáférhet.// // A szál bekerül cv várakozási sorába, és majd értesítik,// // ha a változó igazzá válik, és elaltatja a szálat.// // Ezután más szálak újra hajthatnak végre műveleteket az m mutexen vagy a cv változón.//// Egy másik szál kapja meg a vezérlést ezen a magon.//// Valamikor később a feltétel igazzá válik,// és egy másik szál, ami ugyanezt a monitort használja, signal/notify művelettel// felébreszti ezt a szálat, vagy notifyAll hatására ébred fel, így kikerül a várakozási sorból.//// Ezalatt más szálak is megkaphatják a vezérlést, amitől újra hamissá válhat a feltétel, // többször is változhat, vagy végig igaz maradhat.//// A szál megkapja a vezérlést az egyik magon.//// acquire(m) // A szál visszaszerzi "m" zárját.// Az iteráció vége és a "while" ciklus feltételének ellenőrzése, hogy még mindig igaz-e.}// A feltétel igaz!// Továbbra is a szál birtokolja a zárat, vagy a monitorba lépéstől, vagy a // "wait" utolsó végrehajtása óta.// Ez itt a kritikus szakasz, aminek az előfeltétele éppen a cv változó.// Ennek végrehajtásával a cv feltétel hamisra válthat, és más feltételek// válhatnak igazzá.// Meghívja a signal/notify vagy notifyAll műveletet, attól függően, hogy mely feltételek váltak igazzá.for(cv_xincvs_to_notify){notify(cv_x);--OR--notifyAll(cv_x);}// Egy vagy több szál felébred, de blokkolódnak, amikor megpróbálják megszerezni az m mutexet.// A mutex elengedése, így az értesített szálak és mások beléphetnek a kritikus szakaszba.release(m);
A termelő-fogyasztó probléma megoldása
A klasszikus megoldás két monitort használ, két feltételváltozóval és egy zárral a burreren:
globalvolatileRingBufferqueue;// Nem szálbiztos gyűrű-buffer feladatok számára.globalLockqueueLock;// A buffer mutexe. (Nem busy waitinggel.)globalCVqueueEmptyCV;// Feltételváltozó a fogyasztók számára, akik arra várnak, hogy a sor ne legyen üres.// A hozzá társított zár a "queueLock".globalCVqueueFullCV;// Feltételváltozó a termelők számára, akik arra várnak, hogy a sor ne legyen teli.// A hozzá társított zár szintén a "queueLock".// A termelők viselkedését bemutató metódus:publicmethodproducer(){while(true){taskmyTask=...;// A termelő előkészíti a feladatot a hozzáadásra.queueLock.acquire();// A zár megszerzése kezdeti predikátumellenőrzésre.while(queue.isFull()){// Ellenőrzi, hogy a sor nem teli.// Arra készteti a szálkezelő rendszert, hogy atomosan elengedje a queueLock zárat,// betegye a szálat a CV sorába, és elaltassa.wait(queueLock,queueFullCV);// Ekkor a "wait" automatikusan visszaszerzi a "queueLock" zárat a predikátumfeltétel// újraellenőrzésére.}// Kritikus szakasz, ami azt igényli, hogy a sor ne legyen tele.// N.B.: A szál birtokolja a queueLock zárat.queue.enqueue(myTask);// Add the task to the queue.// A feladat sorba helyezése.// Most a sor garantáltan nem üres, ezt jelzi egy vagy több fogyasztónak,// így egy fogyasztó kiveheti a feladatot:signal(queueEmptyCV);--VAGY--notifyAll(queueEmptyCV);// A kritikus szakasz vége.queueLock.release();// A sor zárjának elengedése.}}// A fogyasztók viselkedését bemutató metódus:publicmethodconsumer(){while(true){queueLock.acquire();// A zár megszerzése kezdeti predikátumellenőrzésre.while(queue.isEmpty()){// Ellenőrzi, hogy a sor nem üres.// Arra készteti a szálkezelő rendszert, hogy atomosan elengedje a queueLock zárat,// betegye a szálat a CV sorába, és elaltassa.wait(queueLock,queueEmptyCV);// Ekkor a "wait" automatikusan visszaszerzi a "queueLock" zárat a predikátumfeltétel// újraellenőrzésére.}// Kritikus szakasz, ami azt igényli, hogy a sor ne legyen üres.// N.B.: A szál birtokolja a queueLock zárat.myTask=queue.dequeue();// Egy feladat kivétele a sorból.// Most a sor garantáltan nem teli, ezt jelzi egy vagy több termelőnek,// így egy termelő betehet egy feladatot:signal(queueFullCV);--OR--notifyAll(queueFullCV);// A kritikus szakasz vége.queueLock.release();// A sor zárjának elengedése.doStuff(myTask);// A kivett feladat felhasználása.}}
Ez biztosítja, hogy a termelők és a fogyasztók is tudják használni a közös buffert. Ha pedig egy szál amúgy sem tudna mit csinálni, akkor blokkolja, amíg nem kerül megfelelő állapotba a sor.
Ennek egy változata csak egy feltételváltozót tartalmaz, neve rendszerint queueFullOrEmptyCV vagy queueSizeChangedCV. Ekkor több feltétel használja ugyanazt a változót, emiatt ez gyengébb feltételt biztosít, mint amiket a szálaknak ellenőrizniük kell. A feltételváltozó reprezentálja mind a teli, mind az üres sor esetén várakozó szálakat. Ekkor viszont minden szálnak a signal helyett notifyAll-t kell hívnia. Ez azért van, mert különben egy rossz feltételhez tartozó szál ébredhet fel, majd aludhat vissza signal hívása nélkül; például termelő ébreszthetne fel termelőt, és fogyasztó fogyasztót. A notifyAll hatására a megfelelő típusú szálakat is felébreszti.
A következő kód ezt valósítja meg.
globalvolatileRingBufferqueue;// Nem szálbiztos gyűrű-buffer feladatok számára.globalLockqueueLock;// A buffer mutexe. (Nem busy waitinggel.)globalCVqueueFullOrEmptyCV;// Egyszerű feltételváltozó, amikor a sor nem áll készen bármely szál számára // például a termelők várnak arra, hogy ne legyen teli // vagy a fogyasztók arra, hogy ne legyen üres.// A hozzá társított zár a "queueLock".// Nem biztonságos a hagyományos "signal" használata, mivel // több feltételpredikátummal áll kapcsolatban.// A termelők viselkedését bemutató metódus:publicmethodproducer(){while(true){taskmyTask=...;// A termelő előkészíti a feladatot a hozzáadásra.queueLock.acquire();// A zár megszerzése kezdeti predikátumellenőrzésre.while(queue.isFull()){// Ellenőrzi, hogy a sor nem teli.// Arra készteti a szálkezelő rendszert, hogy atomosan elengedje a queueLock zárat,// betegye a szálat a CV sorába, és elaltassa.wait(queueLock,queueFullOrEmptyCV);// Ekkor a "wait" automatikusan visszaszerzi a "queueLock" zárat a predikátumfeltétel// újraellenőrzésére.}// Kritikus szakasz, ami azt igényli, hogy a sor ne legyen tele.// N.B.: A szál birtokolja a queueLock zárat.queue.enqueue(myTask);// A feladat sorba helyezése.// Most a sor garantáltan nem üres, ezt jelzi a többi szálnak,// így egy fogyasztó kiveheti a feladatot:notifyAll(queueFullOrEmptyCV);// Itt a "signal" holtponthoz vezethet.// A kritikus szakasz vége.queueLock.release();// A sor zárjának elengedése.}}// A fogyasztók viselkedését bemutató metódus:publicmethodconsumer(){while(true){queueLock.acquire();// A zár megszerzése kezdeti predikátumellenőrzésre.while(queue.isEmpty()){// Ellenőrzi, hogy a sor nem üres.// Arra készteti a szálkezelő rendszert, hogy atomosan elengedje a queueLock zárat,// betegye a szálat a CV sorába, és elaltassa.wait(queueLock,queueFullOrEmptyCV);// Ekkor a "wait" automatikusan visszaszerzi a "queueLock" zárat a predikátumfeltétel// újraellenőrzésére.}// Kritikus szakasz, ami azt igényli, hogy a sor ne legyen üres.// N.B.: A szál birtokolja a queueLock zárat.myTask=queue.dequeue();// Egy feladat kivétele a sorból.// Most a sor garantáltan nem teli, ezt jelzi a többi szálnak,// így egy termelő betehet egy feladatot:notifyAll(queueFullOrEmptyCV);// Itt a "signal" holtponthoz vezethet.// A kritikus szakasz vége.queueLock.release();// A sor zárjának elengedése.doStuff(myTask);// A kivett feladat felhasználása.}}
Szinkronizációs primitívek
A mutexek és a feltételváltozók bevezetése hardver által támogatott szinkronizációs primitíveket igényel az atomi tulajdonság biztosítására. A zárak és a feltételváltozók egy ezek fölötti absztrakciós szintet jelentenek. Egyprocesszoros gépen a megszakítások engedélyezése vagy tiltása a kritikus szakaszok végrehajtása alatt alkalmas a monitorok megvalósítására kontextusváltás tiltásával, de ez többprocesszoros gépeken nem megfelelő. Többprocesszoros gépeken speciális olvasó-módosító-író memóriakezelő utasításokat használnak, mint test-and-set, vagy compare-and-swap. attól függően, hogy mit tesz lehetővé az ISA. Ezek rendszerint busy waitinget használnak, de ez rendszerint nem tart sokáig. A megvalósítástól függően ezek a műveletek lezárhatják a buszt a többi magtól, és/vagy megtilthatják az utasítások átrendezését. A továbbiakban bemutatott példa egy szálkezelő rendszer része, mutexek és Mesa-stílusú feltételváltozók, a test-and-set művelettel, és FIFO-val. Csak a mutexekkel és feltételváltozókkal kapcsolatos részeket mutatjuk:
// A szálkezelő rendszer alapvető részei:// Feltesszük, hogy "ThreadQueue" támogatja a tetszőleges hozzáférést.publicvolatileThreadQueuereadyQueue;// A futásra kész szálak nem szálbiztos tárolója. Az elemek típusa (Thread*).publicvolatileglobalThread*currentThread;// Feltesszük, hogy ezek [[Szálspecifikus tároló|szálspecifikusak]]. (A többi közös.)// Busy waitinget használ, vagy a szálkezelő rendszer szinkronizált állapotát.// Ezt a test-and-set művelettel használják, mint szinkronizációs primitívet.publicvolatileglobalboolthreadingSystemBusy=false;// Kontextusváltásos megszakító szolgáltatás rutin (ISR):// Ezen a CPU magon preemptíven egy másik szálra vált.publicmethodcontextSwitchISR(){if(testAndSet(threadingSystemBusy)){return;// Most még nem lehet kontextust váltani. }// Biztosítjuk, hogy ez a váltás ne történjen meg még egyszer, ami elronthatná a kontextusváltást:systemCall_disableInterrupts();// Az éppen futó szálhoz tartozó regiszterek értékeinek elkérése.// A programszámláló (Program Counter, PC) számára szükség van // az alábbi "resume" címke helyére az utasítások között. A regiszterek értékeinek lekérése platformfüggő, és tartalmazhatja// a jelenlegi stack frame, JMP/CALL utasítások, etc. méretét. (Részletek alább.)currentThread->registers=getAllRegisters();// A regisztereket eltárolja a "currentThread" objektumban a memóriában.currentThread->registers.PC=resume;// Beállítja a következő PC-t a metódus "resume" címkéjére.readyQueue.enqueue(currentThread);// Visszateszi ezt a szálat a futásra készek közé további végrehajtásra.Thread*otherThread=readyQueue.dequeue();// A futásra kész sorból kiveszi a következő szálat.currentThread=otherThread;// Helyettesíti a globális jelenlegi szál mutatót, hogy az új szálra mutasson.// Visszatölti a regisztereket a currentThread/otherThread változóból, köztük egy ugrást az új szál PC-jére// (az alábbi "resume"). Ennek részleteit itt most mellőzzük.restoreRegisters(otherThread.registers);// *** Most az "otherThread" lesz az aktuális (ami most már "currentThread")! az eredeti szál most alszik. ***resume:// Itt most egy másik contextSwitch() hívásra van szükség a PC beállítására, to when switching context back here.// Visszatér oda, ahol az otherThread futása abbamaradt.threadingSystemBusy=false;// Atomi értékadásnak kell lennie.systemCall_enableInterrupts();// Visszaállítja a preemptív váltást ezen a magon.}// Szálaltatási metódus:// Ezen a CPU magon szinkron kontextusváltást hajt végre egy másik szálra anélkül, hogy // az aktuális szálat a futásra kész sorba tenné.// Ehhez birtokolni kell a "threadingSystemBusy" objektumot és letikltani a megszakításokat, hogy // egy kontextusváltás ne szakíthassa meg ezt a metódust, a contextSwitchISR() függvényt hívná.// Visszatéréskor ki kell takarítani a "threadingSystemBusy" objektumot.publicmethodthreadSleep(){// Az éppen futó szálhoz tartozó regiszterek értékeinek elkérése.// A programszámláló (Program Counter, PC) számára szükség van // az alábbi "resume" címke helyére az utasítások között. A regiszterek értékeinek lekérése platformfüggő, és tartalmazhatja// a jelenlegi stack frame, JMP/CALL utasítások, etc. méretét. (Részletek alább.)currentThread->registers=getAllRegisters();// A regisztereket eltárolja a "currentThread" objektumban a memóriában.currentThread->registers.PC=resume;// Beállítja a következő PC-t a metódus "resume" címkéjére.// A contextSwitchISR() metódustól eltérően nem teszi vissza a currentThread szálat a readyQueue sorba.// Ehelyett már benne van egy mutex vagy feltételváltozó várakozási sorába.Thread*otherThread=readyQueue.dequeue();// A futásra kész sorból kiveszi a következő szálat.currentThread=otherThread;// Helyettesíti a globális jelenlegi szál mutatót, hogy az új szálra mutasson.// Visszatölti a regisztereket a currentThread/otherThread változóból, köztük egy ugrást az új szál PC-jére// (az alábbi "resume"). Ennek részleteit itt most mellőzzük.restoreRegisters(otherThread.registers);// *** Most az "otherThread" lesz az aktuális (ami most már "currentThread")! az eredeti szál most alszik. ***resume:// Itt most egy másik contextSwitch() hívásra van szükség a PC beállítására, to when switching context back here.// Visszatér oda, ahol az otherThread futása abbamaradt.}publicmethodwait(Mutexm,ConditionVariablec){// Belső busy waiting zár, amíg egy másik szál hozzá nem fér ennek az objektumnak a következő elemeihez:// "held", "threadQueue", vagy "readyQueue".while(testAndSet(threadingSystemBusy)){}// N.B.: "threadingSystemBusy" most igaz.// Rendszerhívás, ami letiltja a megszakításokat ezen a magon, így a threadSleep() nem szakadhat meg// ezen a magon, ami a contextSwitchISR() függvényt hívná.// A nagyobb hatékonyság érdekében a threadSleep() függvényen kívül hívja, így ez a szál el lesz altatva, // mihelyt a feltételváltozó várakozási sorába kerül.systemCall_disableInterrupts();assertm.held;// (Speciálisan, ennek a szálnak kell birtokolnia.)m.release();c.waitingThreads.enqueue(currentThread);threadSleep();// A szál alszik ... A szál felébred egy signal/broadcast hatására.threadingSystemBusy=false;// Atomi értékadásnak kell lennie.systemCall_enableInterrupts();// Visszaállítja a preemptív váltást ezen a magon.// Mesa stílus:// Kontextusváltás történhet, ami hamissá teheti a hívó predikátumát.m.acquire();}publicmethodsignal(ConditionVariablec){// Belső busy waiting zár, amíg egy másik szál hozzá nem fér ennek az objektumnak a következő elemeihez:// "held", "threadQueue", vagy "readyQueue".while(testAndSet(threadingSystemBusy)){}// N.B.: "threadingSystemBusy" most igaz.// Rendszerhívás, ami letiltja a megszakításokat ezen a magon, így a threadSleep() nem szakadhat meg// ezen a magon, ami a contextSwitchISR() függvényt hívná.// A nagyobb hatékonyság érdekében a threadSleep() függvényen kívül hívja, így ez a szál el lesz altatva, // mihelyt a feltételváltozó várakozási sorába kerül.systemCall_disableInterrupts();if(!c.waitingThreads.isEmpty()){wokenThread=c.waitingThreads.dequeue();readyQueue.enqueue(wokenThread);}threadingSystemBusy=false;// Atomi értékadásnak kell lennie.systemCall_enableInterrupts();// Visszaállítja a preemptív váltást ezen a magon.// Mesa stílus:// A felébredt szál nem kap prioritást.}publicmethodbroadcast(ConditionVariablec){// Belső busy waiting zár, amíg egy másik szál hozzá nem fér ennek az objektumnak a következő elemeihez:// "held", "threadQueue", vagy "readyQueue".while(testAndSet(threadingSystemBusy)){}// N.B.: "threadingSystemBusy" most igaz.// Rendszerhívás, ami letiltja a megszakításokat ezen a magon, így a threadSleep() nem szakadhat meg// ezen a magon, ami a contextSwitchISR() függvényt hívná.// A nagyobb hatékonyság érdekében a threadSleep() függvényen kívül hívja, így ez a szál el lesz altatva, // mihelyt a feltételváltozó várakozási sorába kerül.systemCall_disableInterrupts();while(!c.waitingThreads.isEmpty()){wokenThread=c.waitingThreads.dequeue();readyQueue.enqueue(wokenThread);}threadingSystemBusy=false;// Atomi értékadásnak kell lennie.systemCall_enableInterrupts();// Visszaállítja a preemptív váltást ezen a magon.// Mesa stílus:// A felébredt szál nem kap prioritást.}classMutex{protectedvolatileboolheld=false;privatevolatileThreadQueueblockingThreads;// Thread-unsafe queue of blocked threads. Elements are (Thread*).publicmethodacquire(){// Belső busy waiting zár, amíg egy másik szál hozzá nem fér ennek az objektumnak a következő elemeihez:// "held", "threadQueue", vagy "readyQueue".while(testAndSet(threadingSystemBusy)){}// N.B.: "threadingSystemBusy" most igaz.// Rendszerhívás, ami letiltja a megszakításokat ezen a magon, így a threadSleep() nem szakadhat meg// ezen a magon, ami a contextSwitchISR() függvényt hívná.// A nagyobb hatékonyság érdekében a threadSleep() függvényen kívül hívja, így ez a szál el lesz altatva, // mihelyt a feltételváltozó várakozási sorába kerül.systemCall_disableInterrupts();assert!blockingThreads.contains(currentThread);if(held){// A "currentThread" szálat beteszi ennek a zárnak a várakozási sorába, // így ezen a záron alszik.// Jegyezzük meg, hogy a "currentThread"szálat még a threadSleep() függvénynek is kezelnie kell.readyQueue.remove(currentThread);blockingThreads.enqueue(currentThread);threadSleep();// Felébredtünk, ez azért van, mert a "held" hamisra váltott.assert!held;assert!blockingThreads.contains(currentThread);}held=true;threadingSystemBusy=false;// Atomi értékadásnak kell lennie.systemCall_enableInterrupts();// Visszaállítja a preemptív váltást ezen a magon.}publicmethodrelease(){/// Belső busy waiting zár, amíg egy másik szál hozzá nem fér ennek az objektumnak a következő elemeihez:// "held", "threadQueue", vagy "readyQueue".while(testAndSet(threadingSystemBusy)){}// N.B.: "threadingSystemBusy" most igaz.// Rendszerhívás, ami a hatékonyság letiltja a megszakításokat ezen a magon.systemCall_disableInterrupts();assertheld;// (Elengedés csak akkor, ha a zár birtokában volt.)held=false;if(!blockingThreads.isEmpty()){Thread*unblockedThread=blockingThreads.dequeue();readyQueue.enqueue(unblockedThread);}threadingSystemBusy=false;// Atomi értékadásnak kell lennie.systemCall_enableInterrupts();// Visszaállítja a preemptív váltást ezen a magon.}}structConditionVariable{volatileThreadQueuewaitingThreads;}
Szemafor
Példaként tekintsünk egy szálbiztos osztályt, ami szemafort valósít meg! A metódusok egy privát s egészt növelnek (V) vagy csökkentenek (P). Az egész nem csökkenhet nulla alá, így ha egy szál további csökkentést igényel, akkor várnia kell.
Az alábbi megvalósítás egy sIsPositive feltételváltozót használ, amihez az az állítás tartozik, hogy .
monitor classSemaphore
{
privateint s := 0
invariant s >= 0
privateCondition sIsPositive /* associated with s > 0 */public method P()
{
while s = 0:
wait sIsPositive
assert s > 0
s := s – 1
}
public method V()
{
s := s + 1
assert s > 0
signal sIsPositive
}
}
Az összes szinkronizációt mutató megvalósítás, a szálbiztosság feltételezése nélkül:
classSemaphore
{
privatevolatileint s := 0
invariant s >= 0
privateConditionVariable sIsPositive /* associated with s > 0 */privateMutex myLock /* Lock on "s" */public method P()
{
myLock.acquire()
while s = 0:
wait(myLock, sIsPositive)
assert s > 0
s := s – 1
myLock.release()
}
public method V()
{
myLock.acquire()
s := s + 1
assert s > 0
signal sIsPositive
myLock.release()
}
}
Monitor szemaforokkal
Megfordítva, a zárak és a feltételváltozók is származtathatók szemaforokból, emiatt a szemaforok és a monitorok egymásra redukálhatók.
A következő kód inkorrekt. Ha egy szál signal() után hív wait() metódust, akkor holtpontba kerülhet, mivel a signal() csak annyiszor növeli a szemafort, ahány szál éppen várakozik.
publicmethodwait(Mutexm,ConditionVariablec){assertm.held;c.internalMutex.acquire();c.numWaiters++;m.release();// Can go before/after the neighboring lines.c.internalMutex.release();// Another thread could signal here, but that's OK because of how// semaphores count. If c.sem's number becomes 1, we'll have no// waiting time.c.sem.Proberen();// Block on the CV.// Wokenm.acquire();// Re-acquire the mutex.}publicmethodsignal(ConditionVariablec){c.internalMutex.acquire();if(c.numWaiters>0){c.numWaiters--;c.sem.Verhogen();// (Doesn't need to be protected by c.internalMutex.)}c.internalMutex.release();}publicmethodbroadcast(ConditionVariablec){c.internalMutex.acquire();while(c.numWaiters>0){c.numWaiters--;c.sem.Verhogen();// (Doesn't need to be protected by c.internalMutex.)}c.internalMutex.release();}classMutex{protectedbooleanheld=false;// For assertions only, to make sure sem's number never goes > 1.protectedSemaphoresem=Semaphore(1);// The number shall always be at most 1.// Not held <--> 1; held <--> 0.publicmethodacquire(){sem.Proberen();assert!held;held=true;}publicmethodrelease(){assertheld;// Make sure we never Verhogen sem above 1. That would be bad.held=false;sem.Verhogen();}}classConditionVariable{protectedintnumWaiters=0;// Roughly tracks the number of waiters blocked in sem.// (The semaphore's internal state is necessarily private.)protectedSemaphoresem=Semaphore(0);// Provides the wait queue.protectedMutexinternalMutex;// (Really another Semaphore. Protects "numWaiters".)}
Ha a signal egy olyan feltételváltozón történik, ami éppen várakozik egy vagy több szál, akkor van legalább két szál, ami elfoglalhatja a monitort: a signal jelzést adó és egy várakozó szál. Ahhoz, hogy ez ne történjen meg, két lehetséges megoldás van. Vannak, akik az egyik, mások a másik megoldást választják:
Blokkoló feltételváltozók: a jelzést kapott szálnak ad elsőbbséget
Nem blokkoló feltételváltozók: a jeladó szálnak van elsőbbsége.
Ezeket majd megvizsgáljuk.
Blokkoló feltételváltozók
A blokkoló feltételváltozók C. A. R. Hoare és Per Brinch Hansen eredeti javaslata volt. Ekkor a jeladónak a monitoron kívül kell várakoznia, és a jelt kapó foglalja el a monitort. Ezeket a monitorokat Hoare-féle vagy jel-sürgős-várás (signal-and-urgent-wait) monitoroknak nevezik.
A monitor objektumhoz szálak két sora tartozik:
e a belépők
s a jeladók sora.
Továbbá minden c feltételváltozóhoz van c.q sor, ami a feltételváltozón vár. Minden sorról felteszik, hogy fair, és egyes megvalósításokban a FIFO is garantált.
Az alábbiakban az egyes műveletek megvalósítása következik. Feltételezzük, hogy ezek kölcsönös kizárással futnak, így az újraindított szálak nem kezdenek futni, amíg teljesen be nem fejeződött a művelet.
belépés a monitorba:
belépés a metódusba
ha a monitor foglalt
add ezt a szálat az e sorhoz
blokkold ezt a szálat
különben
foglald el a monitort
a monitor elhagyása:
schedule
return a metódusból
waitc :
add ezt a szálat a c.q sorhoz
schedule
blokkold ezt a szálat
signalc :
ha van a c.q soron várakozó szál
válassz egy ilyet, legyen ez t, és távolítsd el a c.q sorból
(t a jelt kapott szál)
add ezt a szálat az s sorhoz
indítsd újra a t szálat
(így t elfoglalja a monitort)
blokkold ezt a szálat
schedule :
ha van az s sorban várakozó szál
válassz egyet, távolítsd el az s sorból és indítsd újra
(a szál elfoglalja a monitort)
hogyha van várakozó szál az e sorban
válassz egyet, távolítsd el az e és indítsd újra
(a szál elfoglalja a monitort)
különben
szabadítsd fel a monitort monitor
(a monitor szabad, bárki elfoglalhatja)
A schedule rutin kiválasztja a következő szálat, ami elfoglalja a monitort, vagy ha nincs jelölt, akkor felszabadítja.
Az eredmény a jeladás és sürgős várakozás, ami azt jelenti, hogy a jeladónak várnia kell, de elsőbbséget kap. Alternatívája az adj jelt és várakozz, amikor nem kap prioritást. Ekkor nincs külön s sor, mindenki az e sorban vár.
Egyes megvalósítások a jelezz és térj vissza műveletet nyújtják, ami a jeladást visszatéréssel kombinálja.
signalcand return :
ha van a c.q sorban várakozó szál
válassz egyet, legyen ez t és távolítsd el a c.q sorból
(t a jelt kapott szál)
indítsd újra a t szálat
(t elfoglalja a monitort)
különben
schedule
return a metódusból
Akár a jelzés és sürgős várakozás, akár a jelez és várj esetén, ha egy feltételváltozó kap jelt, és van legalább egy szál a feltételváltozó várakozó sorában, a jelző szál átadja a monitort a jelzett szálnak, más szál nem szerezheti meg a kettő között. Ha Pc igaz minden signal(c) művelet elején, akkor minden wait művelet végén is igaz lesz. Ez összegezhető a következő szerződésekkel, ahol I a monitor invariánsa.
enter the monitor:
utófeltételI
leave the monitor:
előfeltételI
waitc :
előfeltételImódosítja a monitor állapotát
utófeltételPc és I
signalc :
előfeltételPc és Imódosítja a monitor állapotát
utófeltételI
signalcand return :
előfeltételPc és I
Ezek a szerződések feltételezik, hogy I és Pc független a sorok hosszától és tartalmától.
Ha a feltételváltozó függ a sorban várakozó szálak számától, akkor bonyolultabbak lesznek a szerződések. Például egy hasznos szerződéspár, ami megengedi a foglalás átadását az invariáns módosítása nélkül:
waitc :
előfeltételImódosítja a monitor állapotát
utófeltételPc
signalcelőfeltétel (nem üres(c) ésPc) vagy (üres(c) ésI)
módosítja a monitor állapotát
utófeltételI
Itt meg kell jegyezni, hogy Pc teljesen a programozótól függ, neki kell úgy megalkotnia, hogy konzisztens legyen.
A következő példa szálbiztos korlátos vermet valósít meg:
monitor classSharedStack {
private const capacity := 10
privateint[capacity] A
privateint size := 0
invariant 0 <= size and size <= capacity
privateBlockingCondition theStackIsNotEmpty /* associated with 0 < size and size <= capacity */privateBlockingCondition theStackIsNotFull /* associated with 0 <= size and size < capacity */
public method push(int value)
{
if size = capacity thenwait theStackIsNotFull
assert 0 <= size and size < capacity
A[size] := value ; size := size + 1
assert 0 < size and size <= capacity
signal theStackIsNotEmpty and return
}
public methodint pop()
{
if size = 0 thenwait theStackIsNotEmpty
assert 0 < size and size <= capacity
size := size – 1 ;
assert 0 <= size and size < capacity
signal theStackIsNotFull and return A[size]
}
}
Ebben a példában a szálbiztosságot belsőleg mutex valósítja meg. Ezen két feltételváltozó osztozik, amelyek különböző feltételeket ellenőriznek ugyanazokon az adatokon. A fenti termelő-fogyasztó példától abban különbözik, hogy az nem szálbiztos sort használ, és az adatstruktúrán kívül szerepel a mutex és a feltételváltozók. A példában a wait művelet a monitor integrált műveleteként szerepel. Az absztrakt működéstől eltekintve minden monitor tartalmaz mutexeket és feltételváltozókat, ahol minden mutex feltételváltozókkal van ellátva.
Nem blokkoló feltételváltozók
A nem blokkoló feltételváltozók használatát Mesa stílusnak vagy jelezz és folytasd stílusnak is nevezik. A jeladó szálnak nem kell elhagynia a monitort, hanem a jelt kapott szálnak kell várakoznia az e sorban. Ebben a stílusban nincs szükség s sorra.
Ebben a stílusban a signal műveletet notify műveletnek is nevezik, a továbbiakban mi is ezt használjuk. Emellett szokás még egy notify all műveletet is definiálni, ami az összes feltételváltozón várakozó szálat áthelyezi az e sorba.
A műveleteket a következőkben írjuk le. Feltesszük, hogy mindezek egymáshoz képest kölcsönös kizárással futnak. Így az újraindított szálak mindaddig nem futnak, amíg a művelet nem ért véget.
enter the monitor:
belépés a metódusba
ha a monitor zárolva
add ezt a szálat az e sorhoz
blokkold ezt a szálat
különben
zárold a monitort
leave the monitor:
schedule
térj vissza a metódusból
waitc :
add ezt a szálat a c.q sorhoz
schedule
blokkold ezt a szálat
notifyc :
ha van a c.q sorban várakozó szál
válassz egyet, legyen ez t, és távolítsd el a t szálat a c.q sorból
(t az értesített szál)
mozgasd a t szálat az e sorba
notify allc :
mozgasd az összes c.q sorban várakozó szálat az e sorba
schedule :
ha van az e sorban várakozó szál
válassz egyet, távolítsd el az e sorból és indítsd újra
különben
szabadítsd fel a monitort
Ennek egy változatában az értesített szálak a w sorba kerülnek, és ennek elsőbbsége van az e sorral szemben. Lásd Howard[4] and Buhr et al.[5]
Minden feltételváltozóhoz meg lehet adni egy állítást, ami a wait után igazzá válik. Azonban biztosítani kell, hogy ez igaz maradjon mindaddig, amíg az értesített szál is be nem léphet. Így a feltétel többnyire igaz.
Éppen ezért szükséges a wait művelet ciklusba ágyazása:
whilenot( P ) dowait c
ahol P erősebb feltétel, mint Pc. A notify és a notify all műveletek utalnak arra, hogy P igaz lehet valamelyik várakozó szálra.
A fenti ciklusban az első után mindegyik futás egy elveszett értesítésnek felel meg. Éppen ezért arra kell törekedni, hogy ne vesszenek el értesítések.
Erre példa egy bankszámla, ahol a levonást végző szálnak meg kell várnia, hogy a számlán legyen elég pénz a levonáshoz:
public method deposit(int amount)
precondition amount >= 0
{
balance := balance + amount
notify all balanceMayBeBigEnough
}
}
Itt a betevő szál nem tudja, hogy teljesült-e a feltétel, vagyis megvan-e a kívánt összeg. Ekkor minden várakozó szálnak lehetőséget kell adni arra, hogy megnézze, igaz-e a feltétele.
Implicit feltételváltozó monitorok
Nevezik Java stílusnak is. A Java nyelvben minden objektum használható monitorként, ehhez explicit meg kell jelölni a synchronized kulcsszóval. Metódusok, blokkok szintén lehetnek synchronizedek.
Explicit feltételváltozók helyett egyetlen várakozósoruk van. A notify és a notifyAll ezen a soron hat. Más nyelvek is adoptálták ezt a stílust, például a C#.
Implicit jelzés
Jel küldése helyett mindig, amikor egy szál elhagyja a monitort, a várakozó szálak feltételei kiértékelődnek. Ha egy feltétel igaz, akkor a szál elfoglalja a monitort. Ekkor nincs szükség feltételváltozókra, viszont a szálak feltételeit explicit le kell kódolni. A wait szerződése:
wait P:
előfeltétel I
módosítja a monitor állapotát
utófeltétel P és I
Története
Brinch Hansen és Hoare az 1970-es évek elején dolgozta ki a monitorokat korábbi ötleteik és Edsger Dijkstra ötletei alapján.[6] Brinch Hansen írta az első cikket, amihez a Simula 67 osztály fogalmát használta fel,[1] és egy sorkezelést is beiktatott.[7] Hoare újradefiniálta a folyamatok újraindítását, folytatását.[2] Brinch Hansen írta az első implementációt a Concurrent Pascalhoz, és Hoare demonstrálta ekvivalenciáját a szemaforral.[6] A Concurrent Pascal közvetítette a fogalmat a Solo operációs rendszerbe, ahol a folyamatok szinkronizálását strukturálta.[8][9]
Programnyelvi támogatás
A monitorok számos programozási nyelvben és könyvtárban megtalálhatók.
Több könyvtár is támogatja a monitort, például a Pthreads, így az olyan nyelvekben is használható, amelyekbe nincs beépítve. Ezekben explicit meg kell jelölni azokat a kódszakaszokat, amelyek kölcsönös kizárással futnak.
↑ ab (1976) „Signaling in monitors”. International Conference on Software EngineeringICSE '76 Proceedings of the 2nd international conference on Software engineering: 47–52, Los Alamitos, CA, USA: IEEE Computer Society Press.
↑ ab (1993) „Monitors and concurrent Pascal: a personal history”. History of Programming LanguagesHOPL-II: The second ACM SIGPLAN conference on History of programming languages: 1–35, New York, NY, USA: ACM. doi:10.1145/155360.155361.
↑Brinch Hansen, Per (1972. július 1.). „Structured multiprogramming (Invited Paper)”. Communications of the ACM.
Ez a szócikk részben vagy egészben a Monitor (synchronization) című angol Wikipédia-szócikk fordításán alapul. Az eredeti cikk szerkesztőit annak laptörténete sorolja fel. Ez a jelzés csupán a megfogalmazás eredetét és a szerzői jogokat jelzi, nem szolgál a cikkben szereplő információk forrásmegjelöléseként.