Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncore simple requester #187

Open
LoLiik opened this issue Aug 17, 2018 · 9 comments
Open

Asyncore simple requester #187

LoLiik opened this issue Aug 17, 2018 · 9 comments

Comments

@LoLiik
Copy link

LoLiik commented Aug 17, 2018

Hello!
I want to make simple asyncronius snmp-requester.
I take example from snmplabs.com

The questions are :

  1. how to get asyncronius behaviour? (see last string of code with print)
  2. i have to runDispatcher() after every getCmd or what the right way of manipulating with SnmpEngine ?
from pysnmp.hlapi.asyncore import *

class Requester:

    def __init__(self):
        self.__snmp_engine = SnmpEngine()

    def send_get_request(self, ip, oids):
        oid_objects = []
        for oid in oids:
            oid_objects.append(ObjectType(ObjectIdentity(oid)))
        # Add request to __snmp_engine
        getCmd(
            self.__snmp_engine,
            CommunityData('public', mpModel=1),
            UdpTransportTarget((ip, 161)),
            ContextData(),
            *oid_objects,
            cbFun=Requester.snmp_response_callback,
            lookupMib=False
        )
        # Send request
        self.__run_dispatcher()

    # Callback function (cbFun) for snmp request
    @staticmethod
    def snmp_response_callback(snmpEngine, sendRequestHandle, errorIndication, errorStatus, errorIndex, varBinds, cbCtx):
        if errorIndication:
            print(errorIndication)
            return
        elif errorStatus:
            print('%s at %s' % (errorStatus.prettyPrint(),
                                errorIndex and varBindTable[-1][int(errorIndex) - 1][0] or '?'))
            return
        else:
                for varBind in varBinds:
                    print(' = '.join([x.prettyPrint() for x in varBind]))

    def __run_dispatcher(self):
        # self.__snmp_engine.transportDispatcher.jobStarted(1)
        self.__snmp_engine.transportDispatcher.runDispatcher()

# MARK: - Test

test_requester = Requester()
test_requester.send_get_request('192.168.0.160', ['.1.3.6.1.2.1.1.1.0']) # OK
test_requester.send_get_request('demo.snmplabs.com', ['.1.3.6.1.2.1.1.1.0']) #  Timeout
print("Want this string to appear right after sending last request, without waiting for timeout and to receive timeout message when timout timer is over")```
@LoLiik LoLiik changed the title Asyncore manager Asyncore simple requester Aug 17, 2018
@etingof
Copy link
Owner

etingof commented Aug 23, 2018

I think the misunderstanding here is that with asynchronous model you prepare/send all your queries and later in time you fire up the main I/O loop which might send out pending packets and will wait for all the responses from the peers that are expected to respond eventually.

Here is your code I tried to rework a bit along the lines above:

from pysnmp.hlapi.asyncore import *

class Requester:

    def __init__(self):
        self.__snmp_engine = SnmpEngine()

    def schedule_get_request(self, ip, oids):
        oid_objects = [ObjectType(ObjectIdentity(oid)) for oid in oids]

        # Build SNMP GET query and place raw packet into asyncore outgoing buffer
        getCmd(
            self.__snmp_engine,
            CommunityData('public', mpModel=1),
            UdpTransportTarget((ip, 161)),
            ContextData(),
            *oid_objects,
            cbFun=Requester.snmp_response_callback,
            lookupMib=False
        )

    # Callback function (cbFun) for snmp request
    @staticmethod
    def snmp_response_callback(snmpEngine, sendRequestHandle, errorIndication, errorStatus, errorIndex, varBinds, cbCtx):
        if errorIndication:
            print(errorIndication)
            return
        elif errorStatus:
            print('%s at %s' % (errorStatus.prettyPrint(),
                                errorIndex and varBindTable[-1][int(errorIndex) - 1][0] or '?'))
            return
        else:
                for varBind in varBinds:
                    print(' = '.join([x.prettyPrint() for x in varBind]))

    def run_dispatcher(self):
        self.__snmp_engine.transportDispatcher.runDispatcher()

# MARK: - Test

test_requester = Requester()
test_requester.schedule_get_request('192.168.0.160', ['.1.3.6.1.2.1.1.1.0']) # OK
test_requester.schedule_get_request('demo.snmplabs.com', ['.1.3.6.1.2.1.1.1.0']) #  Timeout

# Send out all pending requests at once, wait for all the responses or time outs
test_requester.run_dispatcher()

print("Want this string to appear right after sending last request, without waiting for timeout and to receive timeout message when timout timer is over")```

@etingof
Copy link
Owner

etingof commented Aug 23, 2018

Ah, you can interleave these build/wait phases, the main thing is that you should only drop into the waiting state after you are done all the more urgent stuff, because waiting might take time (up to a timeout or a timer tick).

@LoLiik
Copy link
Author

LoLiik commented Aug 23, 2018

@etingof , thank you for your attention!

And if i want to send another request immediately after i've runned Dispatcher, i have to run multiple Requesters in different threads and to have common queue ?
I've thought that i can avoid threads with asyncronous approach.

@etingof
Copy link
Owner

etingof commented Aug 23, 2018

And if i want to send another request immediately after i've runned Dispatcher, i have to run multiple Requesters in different threads and to have common queue ?

No-no-no! That would be a weird mix of asynchronous and threaded models (which would work though).

I think we need to define "immediately"...

Generally, the whole thing is event-driven. So you need an event to trigger your request scheduling code. The event could be for example a reception of a response (or timeout). Or if there is no specific event to piggyback on, you can register your own callable with transport dispatcher so you get your code run every once in a while where you could decide is it time to schedule another query or not yet. Once you return from your timer function, the control is back to the hands of the main loop.

@LoLiik
Copy link
Author

LoLiik commented Aug 23, 2018

For example, another instance (let call it Boss) has my Requester as a property.
Boss wants to send requests by my Requester, and Boss wants to receive responses as soon as it possible.
Boss runs Requster.schedule_get_request() twice in a row (with First and Second requests).
Destination of Second is unreachable and will have timeout.
Than Boss runs dispatcher.
And then Boss wants to send new (Third) request with schedule_get_request(), but the dispatcher is waiting for timeout.
Can Boss send and receive Third request without waiting for timeout of Second request ends?

@etingof
Copy link
Owner

etingof commented Aug 26, 2018

Can Boss send and receive Third request without waiting for timeout of Second request ends?

Yes, but we need an event to break out of the waiting main loop e.g. run_dispatcher() for a moment. The most obvious solution is to have Boss registering itself with pysnmp timer:

def timer_callback(now):
    # potentially, call schedule_get_request() for third request

self.__snmp_engine.transportDispatcher.registerTimerCbFun(timer_callback, tickInterval=0.5):

This way, while being in run_dispatcher() waiting for response, timer_callback will be called every 0.5 sec. Here you could check with Boss if they want to send third request and schedule sending it if so.

@LoLiik
Copy link
Author

LoLiik commented Aug 28, 2018

Can i check if my dispatcher is running?

i.e. should i runDispatcher() again for third request (if previous requests had been responsed) or shedule it in timer_callback()

@etingof
Copy link
Owner

etingof commented Aug 28, 2018

should i runDispatcher() again for third request

Typically, you run main loop (runDispatcher() just once at the end of your script), the main loop is a blocking call e.g. it might only return when no pending requests left. All the live happens in your callback functions being triggered by external events.

You can make runDispatcher() running forever or make it running just for as long as you have at least one request not answered/timed out. In the latter case you will have runDispatcher() returned so you would have to re-start runDispatcher() once get some new queries to process.

@LoLiik
Copy link
Author

LoLiik commented Aug 28, 2018

In my case Boss class is online service, and it wants to send snmp-requests through Requester class all the time by user intense.
So

runDispatcher() running forever

seems right choise for my task.
How can i start dispatcher running forever without any initial getCmd ?

If user want to send first and second requests (from example above), and then want to send third and other requests after that - should i run my dispatcher in separate thread and make it run forever, looking at list/array/queue of new requests ?

For now end up with running dispatcher in separate thread - looks awkward... appreciate any other suggestions!

from pysnmp.hlapi.asyncore import *
import threading

class Requester:

    def __init__(self):
        self.snmp_engine = SnmpEngine()

    def register_get_request(self, ip, oids):
        oid_objects = []
        for oid in oids:
            oid_objects.append(ObjectType(ObjectIdentity(oid)))
        getCmd(
            self.snmp_engine,
            CommunityData('public', mpModel=1),
            UdpTransportTarget((ip, 161), 5, 1),
            ContextData(),
            *oid_objects,
            cbFun=self.snmp_response_callback,
            lookupMib=False
        )

    # Callback function (cbFun) for snmp request
    def snmp_response_callback(self, nmpEngine, sendRequestHandle, errorIndication, errorStatus, errorIndex, varBinds, cbCtx):
        if errorIndication:
            print(errorIndication)
            return
        elif errorStatus:
            print('%s at %s' % (errorStatus.prettyPrint(), errorIndex or '?'))
            return
        else:
            for varBind in varBinds:
                print(' = '.join([x.prettyPrint() for x in varBind]))

    def run_dispatcher(self):
        self.snmp_engine.transportDispatcher.runDispatcher()

    def register_timer_cbFun(self, timer_callback):
        if self.snmp_engine.transportDispatcher is not None:
            self.snmp_engine.transportDispatcher.registerTimerCbFun(timer_callback, tickInterval=0.05)


class Boss:
    def __init__(self):
        self.requester = Requester()
        self.queue = []

        # FIXME: initial requests for transportDispatcher initialization.
        self.requester.register_get_request('192.168.0.160', ['.1.3.6.1.2.1.1.1.0'])
        self.requester.register_get_request('demo.snmplabs.com', ['.1.3.6.1.2.1.1.1.0'])

        self.requester.register_timer_cbFun(self.timer_callback)
        dispatcher_thread = threading.Thread(target=self.requester.run_dispatcher)
        dispatcher_thread.start()

    def timer_callback(self, timeNow):
        for request_info in self.queue:
            ip, oids = request_info
            self.requester.register_get_request(ip, oids)
            self.queue.remove(request_info)

    def new_user_call(self, ip, oids):
        self.queue.append((ip, oids))


# MARK: - Test

boss = Boss()

for i in range(1000):
    boss.new_user_call('192.168.0.160', ['.1.3.6.1.2.1.1.1.0'])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants