Skip to content

Commit

Permalink
Make Version0X works again.
Browse files Browse the repository at this point in the history
Signed-off-by: Toha <tohenk@yahoo.com>
  • Loading branch information
tohenk committed Feb 1, 2024
1 parent fb40422 commit 8be78c1
Showing 1 changed file with 106 additions and 17 deletions.
123 changes: 106 additions & 17 deletions src/Engine/SocketIO/Version0X.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
namespace ElephantIO\Engine\SocketIO;

use InvalidArgumentException;
use RuntimeException;
use stdClass;
use ElephantIO\Engine\AbstractSocketIO;
use ElephantIO\Exception\ServerConnectionFailureException;
use ElephantIO\Payload\Encoder;
use ElephantIO\SequenceReader;
use ElephantIO\Util;

/**
* Implements the dialog with Socket.IO version 0.x
Expand All @@ -27,11 +31,11 @@
*/
class Version0X extends AbstractSocketIO
{
public const PROTO_CLOSE = 0;
public const PROTO_OPEN = 1;
public const PROTO_DISCONNECT = 0;
public const PROTO_CONNECT = 1;
public const PROTO_HEARTBEAT = 2;
public const PROTO_MESSAGE = 3;
public const PROTO_JOIN_MESSAGE = 4;
public const PROTO_JSON = 4;
public const PROTO_EVENT = 5;
public const PROTO_ACK = 6;
public const PROTO_ERROR = 7;
Expand All @@ -44,6 +48,7 @@ public function connect()
return;
}

$this->setTransport($this->options['transport']);
$this->handshake();
$this->upgradeTransport();
}
Expand All @@ -56,20 +61,55 @@ public function close()
}

if ($this->session) {
$this->send(static::PROTO_CLOSE);
$this->send(static::PROTO_DISCONNECT);
}
$this->reset();
}

/** {@inheritDoc} */
public function emit($event, array $args)
{
$this->send(static::PACKET_EVENT, json_encode(['name' => $event, 'args' => $args]));
$this->send(static::PROTO_EVENT, json_encode(['name' => $event, 'args' => $args]));
}

/** {@inheritDoc} */
public function wait($event)
{
while (true) {
if ($packet = $this->drain(0)) {
if ($packet->proto === static::PROTO_EVENT && $this->matchNamespace($packet->nsp)) {
return $packet;
}
}
}
}

/** {@inheritDoc} */
public function drain($timeout = 0)
{
$result = null;
$data = $this->read($timeout);
if (null !== $data) {
$this->logger->debug(sprintf('Got data: %s', Util::truncate((string) $data)));
$packet = $this->decodePacket($data);
switch ($packet->proto) {
case static::PROTO_DISCONNECT:
$this->logger->debug('Connection closed by server');
$this->reset();
throw new RuntimeException('Connection closed by server!');
case static::PROTO_HEARTBEAT:
$this->logger->debug('Got HEARTBEAT');
$this->send(static::PROTO_HEARTBEAT);
break;
case static::PROTO_NOOP:
break;
default:
$result = $packet;
break;
}
}

return $result;
}

/** {@inheritDoc} */
Expand All @@ -78,11 +118,10 @@ public function send($code, $message = null)
if (!$this->connected()) {
return;
}
if (!is_int($code) || 0 > $code || 6 < $code) {
if (!is_int($code) || $code < static::PROTO_DISCONNECT || $code > static::PROTO_NOOP) {
throw new InvalidArgumentException('Wrong message type to sent to socket');
}

$payload = $this->getPayload($code . '::' . $this->namespace . ':' . $message);
$payload = $this->getPayload($code . '::' . $this->normalizeNamespace($this->namespace) . ($message ? ':' . $message : ''));

return $this->write((string) $payload);
}
Expand Down Expand Up @@ -111,7 +150,10 @@ public function of($namespace)
{
parent::of($namespace);

$this->send(static::PROTO_OPEN);
$this->send(static::PROTO_CONNECT);
if (($packet = $this->drain()) && $packet->proto === static::PROTO_CONNECT) {
$this->logger->debug('Successfully connected');
}
}

/** {@inheritDoc} */
Expand All @@ -125,7 +167,7 @@ protected function getDefaultOptions()
{
return [
'version' => 1,
'transport' => static::TRANSPORT_WEBSOCKET,
'transport' => static::TRANSPORT_POLLING,
];
}

Expand All @@ -142,7 +184,46 @@ protected function getPayload($data, $encoding = Encoder::OPCODE_TEXT)
return new Encoder($data, $encoding, true);
}

/** Does the handshake with the Socket.io server and populates the `session` value object */
/**
* Decode a packet.
*
* @param string $data
* @return \stdClass
*/
protected function decodePacket($data)
{
$seq = new SequenceReader($data);
$proto = (int) $seq->readUntil(':');
if ($proto >= static::PROTO_DISCONNECT && $proto <= static::PROTO_NOOP) {
$ack = $seq->readUntil(':');
$packet = new stdClass();
$packet->proto = $proto;
$packet->nsp = $seq->readUntil(':');
$packet->data = !$seq->isEof() ? $seq->getData() : null;
switch ($packet->proto) {
case static::PROTO_MESSAGE:
case static::PROTO_JSON:
if ($packet->data) {
$packet->data = json_decode($packet->data, true);
}
break;
case static::PROTO_EVENT:
if ($packet->data) {
$data = json_decode($packet->data, true);
$packet->event = $data['name'];
$packet->args = $data['args'];
$packet->data = count($packet->args) ? $packet->args[0] : null;
}
break;
}

return $packet;
}
}

/**
* Do the handshake with the socket.io server and populates the `session` value object.
*/
protected function handshake()
{
if (null !== $this->session) {
Expand All @@ -163,7 +244,7 @@ protected function handshake()
'sid' => $sess[0],
'pingInterval' => $sess[1],
'pingTimeout' => $sess[2],
'upgrades' => array_flip(explode(',', $sess[3])),
'upgrades' => explode(',', $sess[3]),
];
$this->storeSession($handshake, $this->stream->getHeaders());

Expand All @@ -174,7 +255,8 @@ protected function handshake()
protected function upgradeTransport()
{
// check if websocket upgrade is needed
if (!in_array(static::TRANSPORT_WEBSOCKET, $this->session->upgrades)) {
if (!in_array(static::TRANSPORT_WEBSOCKET, $this->session->upgrades) ||
!$this->isTransportEnabled(static::TRANSPORT_WEBSOCKET)) {
return;
}

Expand All @@ -183,7 +265,9 @@ protected function upgradeTransport()
// set timeout based on handshake response
$this->setTimeout($this->session->getTimeout());

if ($this->doPoll(null, null, $this->getUpgradeHeaders(), ['skip_body' => true]) == 101) {
if ($this->doPoll(static::TRANSPORT_WEBSOCKET, null, $this->getUpgradeHeaders(), ['skip_body' => true]) == 101) {
$this->setTransport(static::TRANSPORT_WEBSOCKET);

$this->logger->debug('Websocket upgrade completed');
} else {
$this->logger->debug('Upgrade failed, skipping websocket');
Expand All @@ -198,7 +282,12 @@ protected function getTransports()

protected function buildQueryParameters($transport)
{
$path = [$this->options['version'], $transport ?? $this->options['transport']];
$transports = [static::TRANSPORT_POLLING => 'xhr-polling'];
$transport = $transport ?? $this->options['transport'];
if (isset($transports[$transport])) {
$transport = $transports[$transport];
}
$path = [$this->options['version'], $transport];
if ($this->session) {
$path[] = $this->session->id;
}
Expand All @@ -210,8 +299,8 @@ protected function buildQuery($query)
{
$url = $this->stream->getUrl()->getParsed();
$uri = sprintf('/%s/%s', trim($url['path'], '/'), implode('/', $query['path']));
if (isset($url['query'])) {
$uri .= '/?' . http_build_query($url['query']);
if (isset($url['query']) && $params = http_build_query($url['query'])) {
$uri .= '/?' . $params;
}

return $uri;
Expand Down

0 comments on commit 8be78c1

Please sign in to comment.