_socket = fsockopen($str, $port, $errno, $errstr); if ($this->_socket === false) { // aparently there is some reason that fsockopen will return false // but it normally throws an error. require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception("Unable to connect to $str; error = $errstr ( errno = $errno )"); } stream_set_blocking($this->_socket, 0); // non blocking if (!isset($options['timeout_sec'])) { $options['timeout_sec'] = self::READ_TIMEOUT_DEFAULT_SEC; } if (! isset($options['timeout_usec'])) { $options['timeout_usec'] = self::READ_TIMEOUT_DEFAULT_USEC; } $this->_options = $options; return true; } /** * Close the socket explicitly when destructed * * @return void */ public function __destruct() { } /** * Close connection * * @param boolean $destructor * @return void */ public function close($destructor = false) { // Gracefully disconnect if (!$destructor) { $frame = $this->createFrame(); $frame->setCommand('DISCONNECT'); $this->write($frame); } // @todo: Should be fixed. // When the socket is "closed", it will trigger the below error when php exits // Fatal error: Exception thrown without a stack frame in Unknown on line 0 // Danlo: I suspect this is because this has already been claimed by the interpeter // thus trying to shutdown this resources, which is already shutdown is a problem. if (is_resource($this->_socket)) { // fclose($this->_socket); } // $this->_socket = null; } /** * Check whether we are connected to the server * * @return true * @throws Zend_Queue_Exception */ public function ping() { if (!is_resource($this->_socket)) { require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception('Not connected to Stomp server'); } return true; } /** * Write a frame to the stomp server * * example: $response = $client->write($frame)->read(); * * @param Zend_Queue_Stom_FrameInterface $frame * @return $this */ public function write(Zend_Queue_Stomp_FrameInterface $frame) { $this->ping(); $output = $frame->toFrame(); $bytes = fwrite($this->_socket, $output, strlen($output)); if ($bytes === false || $bytes == 0) { require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception('No bytes written'); } return $this; } /** * Tests the socket to see if there is data for us * * @return boolean */ public function canRead() { $read = array($this->_socket); $write = null; $except = null; return stream_select( $read, $write, $except, $this->_options['timeout_sec'], $this->_options['timeout_usec'] ) == 1; // see http://us.php.net/manual/en/function.stream-select.php } /** * Reads in a frame from the socket or returns false. * * @return Zend_Queue_Stomp_FrameInterface|false * @throws Zend_Queue_Exception */ public function read() { $this->ping(); $response = ''; $prev = ''; // while not end of file. while (!feof($this->_socket)) { // read in one character until "\0\n" is found $data = fread($this->_socket, 1); // check to make sure that the connection is not lost. if ($data === false) { require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception('Connection lost'); } // append last character read to $response $response .= $data; // is this \0 (prev) \n (data)? END_OF_FRAME if (ord($data) == 10 && ord($prev) == 0) { break; } $prev = $data; } if ($response === '') { return false; } $frame = $this->createFrame(); $frame->fromFrame($response); return $frame; } /** * Set the frameClass to be used * * This must be a Zend_Queue_Stomp_FrameInterface. * * @param string $classname - class is an instance of Zend_Queue_Stomp_FrameInterface * @return $this; */ public function setFrameClass($classname) { $this->_options['frameClass'] = $classname; return $this; } /** * Get the frameClass * * @return string */ public function getFrameClass() { return isset($this->_options['frameClass']) ? $this->_options['frameClass'] : 'Zend_Queue_Stomp_Frame'; } /** * Create an empty frame * * @return Zend_Queue_Stomp_FrameInterface */ public function createFrame() { $class = $this->getFrameClass(); if (!class_exists($class)) { require_once 'Zend/Loader.php'; Zend_Loader::loadClass($class); } $frame = new $class(); if (!$frame instanceof Zend_Queue_Stomp_FrameInterface) { require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception('Invalid Frame class provided; must implement Zend_Queue_Stomp_FrameInterface'); } return $frame; } }