Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Treq for Twisted integration #106

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 18 additions & 77 deletions rollbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,59 +93,8 @@ def wrap(*args, **kwargs):
TornadoAsyncHTTPClient = None

try:
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, Deferred, returnValue, succeed
from twisted.internet.protocol import Protocol
import treq
from twisted.python import log as twisted_log
from twisted.web.client import Agent as TwistedHTTPClient
from twisted.web.http_headers import Headers as TwistedHeaders
from twisted.web.iweb import IBodyProducer

from zope.interface import implementer


try:
# Verify we can make HTTPS requests with Twisted.
# From http://twistedmatrix.com/documents/12.0.0/core/howto/ssl.html
from OpenSSL import SSL
except ImportError:
log.exception('Rollbar requires SSL to work with Twisted')
raise


@implementer(IBodyProducer)
class StringProducer(object):

def __init__(self, body):
self.body = body
self.length = len(body)

def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)

def pauseProducing(self):
pass

def stopProducing(self):
pass


class ResponseAccumulator(Protocol):
def __init__(self, length, finished):
self.remaining = length
self.finished = finished
self.response = ''

def dataReceived(self, bytes):
if self.remaining:
chunk = bytes[:self.remaining]
self.response += chunk
self.remaining -= len(chunk)

def connectionLost(self, reason):
self.finished.callback(self.response)


def log_handler(event):
"""
Expand All @@ -171,9 +120,7 @@ def log_handler(event):


except ImportError:
TwistedHTTPClient = None
inlineCallbacks = passthrough_decorator
StringProducer = None
treq = None


def get_request():
Expand Down Expand Up @@ -450,8 +397,8 @@ def send_payload(payload, access_token):
return
_send_payload_appengine(payload, access_token)
elif handler == 'twisted':
if TwistedHTTPClient is None:
log.error('Unable to find twisted')
if treq is None:
log.error('Unable to find Treq')
return
_send_payload_twisted(payload, access_token)
else:
Expand Down Expand Up @@ -1246,31 +1193,25 @@ def _send_payload_twisted(payload, access_token):
log.exception('Exception while posting item %r', e)


@inlineCallbacks

def _post_api_twisted(path, payload, access_token=None):
headers = {'Content-Type': ['application/json']}

def post_data_cb(data, resp):
resp._content = data
_parse_response(path, settings['access_token'], payload, resp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

settings should be SETTINGS


def post_cb(resp):
r = requests.Response()
r.status_code = resp.code
r.headers.update(resp.headers.getAllRawHeaders())
return treq.content(response).addCallback(post_data_cb, r)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is breaking for me here. Did you mean to write return treq.content(resp).addCallback(post_data_cb, r)? I think that should do what you intended.


headers = {'Content-Type': ['application/json']}
if access_token is not None:
headers['X-Rollbar-Access-Token'] = [access_token]

url = urljoin(SETTINGS['endpoint'], path)

agent = TwistedHTTPClient(reactor, connectTimeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT))
resp = yield agent.request(
'POST',
url,
TwistedHeaders(headers),
StringProducer(payload))

r = requests.Response()
r.status_code = resp.code
r.headers.update(resp.headers.getAllRawHeaders())
bodyDeferred = Deferred()
resp.deliverBody(ResponseAccumulator(resp.length, bodyDeferred))
body = yield bodyDeferred
r._content = body
_parse_response(path, SETTINGS['access_token'], payload, r)
yield returnValue(None)
d = treq.post(url, payload, headers=headers, timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT))
d.addCallback(post_cb)


def _parse_response(path, access_token, params, resp, endpoint=None):
Expand Down