Send ActionScript Worker Messages 6x Faster With Mutex
While ActionScript Workers made their debut in Flash Player 11.4,the
The Mutex class is special in a couple of ways. First,when you call The second special functionality is the sole reason for function doubleFirstByte(bytes:ByteArray,mutex:Mutex): void { // Lock the Mutex. If the Mutex is already locked by another worker,// this worker will be suspended until the Mutex is unlocked. At that // point the lock() function will return. mutex.lock(); // The Mutex has now been locked by our thread. In this example we // take that to mean that the ByteArray is free for us to manipulate. // Here we simply double the first byte's value. bytes[0] *= 2; // Now that we're done with the Mutex we need to tell other workers // so they can resume and their calls to lock() will return. Calling // unlock does exactly this. mutex.unlock(); } Imagine two workers running the Initial value is set to 1 Worker A executes and reads 1 Worker B executes and reads 1 Worker A executes and writes 2 Worker B executes and writes 2 Final result after two runs: 2 Expected result: 4 Any number of scenarios could play out because you don’t know the order that the threads will execute in. Hoping for the best simply won’t do,so you need a way to communicate between the threads and avoid contention over the shared // Check for the incoming message over and over until it's received while (worker.getSharedProperty("message") == inMsg) { } Essentially,the receiving worker would spend 100% of the CPU time it was allotted to simply check for incoming messages. With a Let’s see how that strategy works out. The following is an amended version of the last article’s small test application that includes a package { import flash.display.Sprite; import flash.events.Event; import flash.utils.getTimer; import flash.utils.ByteArray; import flash.text.TextField; import flash.text.TextFieldAutoSize; import flash.system.Worker; import flash.system.WorkerDomain; import flash.system.WorkerState; import flash.system.MessageChannel; import flash.concurrent.Mutex; /** * Test to show the speed of passing messages between workers via three * methods: MessageChannel,setSharedProperty with a tight polling loop,and * setSharedProperty with a Mutex. * @author Jackson Dunstan (JacksonDunstan.com) */ public class MutexTest extends Sprite { /** Output logger */ private var logger:TextField = new TextField(); /** * Log a CSV row * @param cols Columns of the row */ private function row(...cols): void { logger.appendText(cols.join(",")+"n"); } /** Message channel from the main thread to the worker thread */ private var mainToWorker:MessageChannel; /** Message channel from the worker thread to the main thread */ private var workerToMain:MessageChannel; /** The worker thread (main thread only) */ private var worker:Worker; /** Number of messages to send back and forth in the test */ private var REPS:int = 1000; /** Time before the message passing test started */ private var beforeTime:int; /** Current message index */ private var cur:int; /** * Start the app in main thread or worker thread mode */ public function MutexTest() { // Setup the logger logger.autoSize = TextFieldAutoSize.LEFT; addChild(logger); // If this is the main SWF,start the main thread if (Worker.current.isPrimordial) { startMainThread(); } // If this is the worker thread SWF,start the worker thread else { startWorkerThread(); } } /** * Start the main thread */ private function startMainThread(): void { // Try to get a very good framerate stage.frameRate = 60; // Create the worker from our own SWF bytes worker = WorkerDomain.current.createWorker(this.loaderInfo.bytes); // Create a message channel to send to the worker thread mainToWorker = Worker.current.createMessageChannel(worker); worker.setSharedProperty("mainToWorker",mainToWorker); // Create a message channel to receive from the worker thread workerToMain = worker.createMessageChannel(Worker.current); workerToMain.addEventListener(Event.CHANNEL_MESSAGE,onWorkerToMainDirect); worker.setSharedProperty("workerToMain",workerToMain); // Start the worker worker.start(); // Begin the test where we use MessageChannel to send messages // between the threads by sending the first message beforeTime = getTimer(); mainToWorker.send("1"); } /** * Start the worker thread */ private function startWorkerThread(): void { // Get the message channels the main thread set up for communication // between the threads mainToWorker = Worker.current.getSharedProperty("mainToWorker"); workerToMain = Worker.current.getSharedProperty("workerToMain"); mainToWorker.addEventListener(Event.CHANNEL_MESSAGE,onMainToWorker); } /** * Callback for when a message has been received from the main thread to * the worker thread on a MessageChannel * @param ev CHANNEL_MESSAGE event */ private function onMainToWorker(ev:Event): void { // Record the message and send a response cur++; workerToMain.send("1"); // If this was the last message,prepare for the next test where the // two threads communicate with shared properties if (cur == REPS) { // We receive "1" on our own Thread (the worker thread) and send // "2" in response for the setSharedProperty and mutex tests setSharedPropertyTest(Worker.current,"1","2",false); mutexTest(Worker.current,false); } } /** * Callback for when the worker thread sends a message to the main thread * via a MessageChannel * @param ev CHANNEL_MESSAGE event */ private function onWorkerToMainDirect(ev:Event): void { // Record the message and show progress (this version is slow) cur++; logger.text = "MessageChannel: " + cur + " / " + REPS; // If this wasn't the last message,send another message to the // worker thread if (cur < REPS) { mainToWorker.send("1"); return; } // The MessageChannel test is done. Record the time it took. var afterTime:int = getTimer(); var messageChannelTime:int = afterTime - beforeTime; // Run the setSharedProperty test where the two threads communicate // by directly setting shared properties on the worker thread. The // main thread receives "2",sends "1",and is responsible for // starting the process with an initial "1" message. beforeTime = getTimer(); setSharedPropertyTest(worker,true); afterTime = getTimer(); var setSharedPropertyTime:int = afterTime - beforeTime; // Run the mutex test where the two threads communicate by directly // setting shared properties on the worker thread and pause // execution via a mutex instead of constantly polling via a tight // while loop. The main thread receives "2",and is // responsible for starting the process with an initial "1" message. beforeTime = getTimer(); mutexTest(worker,true); afterTime = getTimer(); var mutexTime:int = afterTime - beforeTime; // Clear the logger and show the results instead logger.text = ""; row("Type","Time","Messages/sec"); var messagesPerSecond:Number = messageChannelTime/Number(REPS); row("MessageChannel",messageChannelTime,messagesPerSecond); messagesPerSecond = setSharedPropertyTime/Number(REPS); row("setSharedProperty",setSharedPropertyTime,messagesPerSecond); messagesPerSecond = mutexTime/Number(REPS); row("mutex",mutexTime,messagesPerSecond); } /** * Perform the setSharedProperty test where the two threads communicate * by directly setting shared properties on the worker thread. This * version uses a tight polling loop to repeatedly check until the * incoming message is ready. * @param worker Worker the shared properties are set on * @param inMessage Expected message this thread receives from the other * @param outMessage Message to send to the other thread * @param sendInitial If an initial message should be sent */ private function setSharedPropertyTest( worker:Worker,inMsg:String,outMsg:String,sendInitial:Boolean ): void { // Reset the count from the first test cur = 0; // Optionally send an initial outgoing message to start the process if (sendInitial) { worker.setSharedProperty("message",outMsg); } // Send messages until we've hit the limit while (cur < REPS) { // Check to see if the shared property is the incoming message if (worker.getSharedProperty("message") == inMsg) { // Record the message and send a response by setting the // shared property to the outgoing message cur++; worker.setSharedProperty("message",outMsg); } } } /** * Perform the setSharedProperty test where the two threads communicate * by directly setting shared properties on the worker thread. This * version uses a Mutex to pause execution of the worker/thread until the * incoming message is ready. * @param worker Worker the shared properties are set on * @param inMessage Expected message this thread receives from the other * @param outMessage Message to send to the other thread * @param sendInitial If an initial message should be sent */ private function mutexTest( worker:Worker,sendInitial:Boolean ): void { // Reset the count from the first test cur = 0; // Optionally send an initial outgoing message to start the process var mutex:Mutex; if (sendInitial) { mutex = new Mutex(); worker.setSharedProperty("mutex",mutex); worker.setSharedProperty("message",outMsg); } else { // Wait for the main thread to send the mutex do { mutex = worker.getSharedProperty("mutex") as Mutex; } while (mutex == null); } // Send messages until we've hit the limit while (cur < REPS) { // Wait for the other thread to unlock the mutex. When they do,// the incoming message is ready. mutex.lock(); // Record the message and send a response by setting the // shared property to the outgoing message cur++; worker.setSharedProperty("message",outMsg); // Notify the other thread that the outgoing message is ready // by unlocking the mutex mutex.unlock(); } } } } Run the test I ran this test in the following environment:
And here are the results I got:
The (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |