{ "cells": [ { "cell_type": "markdown", "id": "annoying-preservation", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# A first Example using Sarracenia Moth API\n", "\n", "Sarracenia is a package built to announce the availability of new data, usually as files.\n", "We put files on standard servers, making them available via web or sftp, and tell\n", "users that they have arrived using messages. \n", "\n", "Sarracenia uses existing standard message passing protocols, like rabbitmq/AMQP to transport the messages,\n", "and in message passing circles, as server that distributes messages is called a *broker*.\n", "\n", "We call the combination of a message broker, and a file server (which can be a single server, or a large cluster) a **data pump**.\n", "\n", "Assuming you have installed the **metpx-sr3** package, either as a debian package, or via pip,\n", "One way access announcements to use with sarracenia.moth (Messages Organized by Topic Headers) class,\n", "which allows a python program to connect to a Sarracenia server, and start receiving \n", "messages that announce resources.\n", "\n", "The factory to build sarracenia.moth objects requires a dictionary of settings as an argument: \n", "\n", "\n", "* options: a dictionary of other settings the class might use.\n", "\n", " * 'broker': an object (Credential) containing a url pointing to the message server that is announcing products, and other related options.\n", "\n", " \n", "\n", "The example below builds a call to an broker anyone can access, and just request\n", "5 announcements.\n", "\n", "You can run it, and then we can discuss a few settings:\n" ] }, { "cell_type": "code", "execution_count": 4, "id": "romance-handy", "metadata": { "scrolled": true, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "options: {'acceptUnmatched': True, 'batch': 25, 'bindings': [('xpublic', ['v02', 'post'], ['#'])], 'broker': , 'dry_run': False, 'exchange': None, 'expire': None, 'inline': False, 'inlineEncoding': 'guess', 'inlineByteMax': 4096, 'logFormat': '%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s', 'logLevel': 'info', 'messageDebugDump': False, 'message_strategy': {'reset': True, 'stubborn': True, 'failure_duration': '5m'}, 'message_ttl': 0, 'topicPrefix': ['v02', 'post'], 'tlsRigour': 'normal', 'auto_delete': False, 'durable': True, 'exchangeDeclare': True, 'prefetch': 25, 'queueName': 'q_anonymous_fractal_SomethingHelpfulToYou', 'queueBind': True, 'queueDeclare': True, 'reset': False, 'subtopic': [], 'vhost': '/'}\n" ] } ], "source": [ "import sarracenia.moth\n", "import sarracenia.moth.amqp\n", "import sarracenia.credentials\n", "\n", "import time\n", "import socket\n", "\n", "options = sarracenia.moth.default_options\n", "\n", "options.update(sarracenia.moth.amqp.default_options)\n", "\n", "options['broker'] = sarracenia.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')\n", "options['topicPrefix'] = [ 'v02', 'post' ]\n", "options['bindings'] = [('xpublic', ['v02', 'post'], ['#'])]\n", "options['queueName'] = 'q_anonymous_' + socket.getfqdn() + '_SomethingHelpfulToYou'\n", "\n", "print('options: %s' % options)\n", "\n" ] }, { "cell_type": "markdown", "id": "figured-estimate", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "\n", "The **broker** setting is an object containing a conventional URL and other options, indicating the messaging protocol to be used to access the upstream server. When you connect to a broker, you need to tell it what messages you are interested in.\n", "In Moth, all the brokers we are accessing are expected to use topic hierarchies. You can see them if you\n", "successfully ran the example above, there should be in the message print outs a \"topic\" element in \n", "dictionaries. Here is an example of one:\n", "\n", "__v02.post.20210213.WXO-DD.observations.swob-ml.20210213.CTZR__\n", "\n", "This divides into two parts:\n", "\n", "* topic_prefix: v02.post\n", "* the rest of the topic tree is a reflection of the path to the announced product, relative to a base directory.\n", "\n", "\n", "In AMQP, there is the concept of \"exchanges\" which are sort of analogous to television channels... they are groupings of announcements. so to connect to an AMQP broker, one needs to specify: \n", "\n", "* exchange: Sarracenia promulgates xpublic as a conventional default.\n", "* topic_prefix: decide which version of messages you want to obtain. This server is producing v02 ones.\n", "* subtopic: what subset of topic_prefix messages do we want to subscribe to.\n", "\n", "\n", "## Bindings\n", "\n", "The bindings option sets out the three values above. in the example, The bindings are:\n", "\n", "* topic_prefix: v02.post (get v02 messages.)\n", "* exchange: xpublic (the default one.)\n", "* subtopic: # ( an AMQP wildcard meaning everything. )\n", "\n", "we connect to the\n", "\n", "amqp://hpfx.collab.science.gc.ca broker, on the *xpublic* exchange, and the we will be interested in all messages matching the v02.post.# topic specification... (which is all v02 messages available.)\n", "\n", "### subtopic\n", "\n", "The subtopic here ( __#__ ) is matches everything produced on the server. The wider the subtopic, the more messages have to be sent, and the more processing done. It is better to make it narrower. Taking the example above, if we are interested in swob, a subtopic like:\n", "\n", "* *.WXO-DD.observations.swob-ml.#\n", "\n", "would match all of the swobs similar to the one above, but avoid sending messages for non-swobs to you.\n", "\n", "\n", "## queueName\n", "\n", "By convention in brokers administered by Sarracenia, users can only create queues that start with q_ followed by their user name. we connected as anonymous, and so q_anonymous must be used. After that, the rest can be whatever you want, but there are a few considerations:\n", "\n", "* If you want to start up multiple python processes to share a data feed, they all specify the same queue_name, and they will share the flow of messages. It scales well for a few dozen co-operating downloaders, but does not scale infinitely, do not expect more than 99 or so processes to be able to effectively share a load from a single queue. To scale beyond that with AMQP, multiple selections are better.\n", "\n", "* if you are going to ask for help from the data pump admins... you are going to need to supply them the name of the queue, and they may need to be able to pick it out of hundreds or thousands that are on the server.\n", "\n", "\n", "## Messages\n", "\n", "Different messaging protocols have different storage structures and conventions. the MoTH class returns\n", "messages as python dictionaries regardless of what protocol is used to obtain them or, if forwarding them, to transmit them. One can add fields for programmatic use to the messages just by adding elements to the dictionary.\n", "If they are only for internal use, then add the name of the dictionary element to the special '\\_deleteOnPost' key, so that the dictionary element will be dropped when forwarding the message.\n", "\n", "\n", "## Ack\n", "\n", "Messages are marked in transit by the broker, and if you do not acknowledge them, the data pump will hold onto them, and eventually re-dispatch them. keeping pending messages in memory will also slow down processing of all messages. One should acknowledge receipt of messages as soon as practicable, but not so soon that you will lose data if the the program is interrupted. In the example, we acknowledge after we have done our work of printing the message.\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": 6, "id": "little-louis", "metadata": { "scrolled": true }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2023-05-27 10:55:22,559 [INFO] sarracenia.moth.amqp __getSetup queue declared q_anonymous_fractal_SomethingHelpfulToYou (as: amqps://anonymous@hpfx.collab.science.gc.ca) \n", "2023-05-27 10:55:22,560 [INFO] sarracenia.moth.amqp __getSetup binding q_anonymous_fractal_SomethingHelpfulToYou with v02.post.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "message 0: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145518', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_34df392aeffc9c678011f3fd30193bb6:CMC:REGIONAL:GRIB2:BIN::20230527145518', 'source': 'WXO-DD', 'mtime': '20230527T145520.791', 'atime': '20230527T145520.791', 'pubTime': '20230527T145520.791', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_WIND_ISBL_30_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'U1vVZnatrCeK3bLrXshb2g=='}, 'size': 554100, 'exchange': 'xpublic', 'ack_id': 1, 'local_offset': 0}\n", "first 50 bytes of corresponding file: b'GRIB\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x08tt\\x00\\x00\\x00\\x15\\x01\\x006\\x00\\x00\\x04\\x00\\x01\\x07\\xe7\\x05\\x1b\\x0c\\x00\\x00\\x00\\x02\\x00\\x00\\x00A\\x03\\x00\\x00\\x0b\\xc1\\x88\\x00\\x00\\x00'\n", "message 1: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145519', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_752eb4e8803503704990563d84030e67:CMC:REGIONAL:GRIB2:BIN::20230527145519', 'source': 'WXO-DD', 'mtime': '20230527T145520.292', 'atime': '20230527T145520.292', 'pubTime': '20230527T145520.292', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_HGT_ISBL_250_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'j6bh9dbE4QbJAXEOejw0Tw=='}, 'size': 377005, 'exchange': 'xpublic', 'ack_id': 2, 'local_offset': 0}\n", "first 50 bytes of corresponding file: b'GRIB\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x05\\xc0\\xad\\x00\\x00\\x00\\x15\\x01\\x006\\x00\\x00\\x04\\x00\\x01\\x07\\xe7\\x05\\x1b\\x0c\\x00\\x00\\x00\\x02\\x00\\x00\\x00A\\x03\\x00\\x00\\x0b\\xc1\\x88\\x00\\x00\\x00'\n", "message 2: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145519', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_55f121bb28e822cffb6e61196cd924eb:CMC:REGIONAL:GRIB2:BIN::20230527145519', 'source': 'WXO-DD', 'mtime': '20230527T145521.260', 'atime': '20230527T145521.260', 'pubTime': '20230527T145521.260', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_RH_ISBL_700_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'V7goy/doL6Gle68s1zoVEA=='}, 'size': 808438, 'exchange': 'xpublic', 'ack_id': 3, 'local_offset': 0}\n", "first 50 bytes of corresponding file: b'GRIB\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x0cU\\xf6\\x00\\x00\\x00\\x15\\x01\\x006\\x00\\x00\\x04\\x00\\x01\\x07\\xe7\\x05\\x1b\\x0c\\x00\\x00\\x00\\x02\\x00\\x00\\x00A\\x03\\x00\\x00\\x0b\\xc1\\x88\\x00\\x00\\x00'\n", "message 3: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145518', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_dac300cf33756ba816e030f99fc9dc22:CMC:REGIONAL:GRIB2:BIN::20230527145518', 'source': 'WXO-DD', 'mtime': '20230527T145519.586', 'atime': '20230527T145519.586', 'pubTime': '20230527T145519.586', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_UGRD_ISBL_225_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'MI8XzT1uam5OUf7QlDZ4FA=='}, 'size': 487411, 'exchange': 'xpublic', 'ack_id': 4, 'local_offset': 0}\n", "first 50 bytes of corresponding file: b'GRIB\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x07o\\xf3\\x00\\x00\\x00\\x15\\x01\\x006\\x00\\x00\\x04\\x00\\x01\\x07\\xe7\\x05\\x1b\\x0c\\x00\\x00\\x00\\x02\\x00\\x00\\x00A\\x03\\x00\\x00\\x0b\\xc1\\x88\\x00\\x00\\x00'\n", "message 4: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145519', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_c5e84748169e0a6dce8f3b884ffdf059:CMC:REGIONAL:GRIB2:BIN::20230527145519', 'source': 'WXO-DD', 'mtime': '20230527T145520.651', 'atime': '20230527T145520.651', 'pubTime': '20230527T145520.651', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_RH_ISBL_550_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'zukdtksA5I0C5oq/ieiXbQ=='}, 'size': 774394, 'exchange': 'xpublic', 'ack_id': 5, 'local_offset': 0}\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2023-05-27 10:55:24,022 [INFO] sarracenia.moth.amqp getCleanUp deleteing queue q_anonymous_fractal_SomethingHelpfulToYou\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "first 50 bytes of corresponding file: b'GRIB\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x0b\\xd0\\xfa\\x00\\x00\\x00\\x15\\x01\\x006\\x00\\x00\\x04\\x00\\x01\\x07\\xe7\\x05\\x1b\\x0c\\x00\\x00\\x00\\x02\\x00\\x00\\x00A\\x03\\x00\\x00\\x0b\\xc1\\x88\\x00\\x00\\x00'\n", "obtained 5 product announcements\n" ] } ], "source": [ "h = sarracenia.moth.Moth.subFactory(options)\n", "\n", "count=0\n", "good=0\n", "while count < 5:\n", " m = h.getNewMessage() #get only one Message\n", " if m is not None:\n", " print(\"message %d: %s\" % (count,m) )\n", " content = m.getContent() \n", " print(\"first 50 bytes of corresponding file: %s\" % content[0:50])\n", " good +=1 \n", " h.ack(m)\n", " time.sleep(0.1)\n", " count += 1\n", "\n", "h.cleanup() # remove server-side queue defined by Factory.\n", "h.close()\n", "print( f\"obtained {good} product announcements\")" ] }, { "cell_type": "markdown", "id": "other-woman", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "2nd example ... combine baseURL + relPath (talk about retPath) and retrieve data...\n", "use newMessages() instead of getNewMessage to show alternate consumption ui.\n", "talk about http, and how retrieval will vary depending on the protocol listed in the baseUrl, and can get\n", "complicated.\n", "\n" ] }, { "cell_type": "code", "execution_count": 7, "id": "abroad-sense", "metadata": { "scrolled": true }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2023-05-27 11:00:20,655 [INFO] sarracenia.moth.amqp __getSetup queue declared q_anonymous_fractal_SomethingHelpfulToYou (as: amqps://anonymous@hpfx.collab.science.gc.ca) \n", "2023-05-27 11:00:20,656 [INFO] sarracenia.moth.amqp __getSetup binding q_anonymous_fractal_SomethingHelpfulToYou with v02.post.*.WXO-DD.observations.swob-ml.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "url 0: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CMTH/2023-05-27-1459-CMTH-AUTO-minute-swob.xml\n", "station: THETFORD MINES RCS, tc_id: MTH, lat: 46.049134, long: -71.266448, air_temp: 17.3\n", "url 1: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWPZ/2023-05-27-1459-CWPZ-AUTO-minute-swob.xml\n", "station: BURNS LAKE DECKER LAKE, tc_id: WPZ, lat: 54.383092, long: -125.95879, air_temp: 7.2\n", "url 2: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWRK/2023-05-27-1459-CWRK-AUTO-minute-swob.xml\n", "station: BANCROFT AUTO, tc_id: WRK, lat: 45.071498, long: -77.879936, air_temp: 22.6\n", "url 3: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWSV/2023-05-27-1459-CWSV-AUTO-minute-swob.xml\n", "station: BLUE RIVER CS, tc_id: WSV, lat: 52.128917, long: -119.289848, air_temp: 14.7\n", "url 4: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWSL/2023-05-27-1459-CWSL-AUTO-minute-swob.xml\n", "station: SALMON ARM CS, tc_id: WSL, lat: 50.703, long: -119.290677, air_temp: 15.2\n", "url 5: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CPRJ/2023-05-27-1459-CPRJ-AUTO-minute-swob.xml\n", "station: YORKTON RCS, tc_id: PRJ, lat: 51.260003, long: -102.461318, air_temp: 14.1\n", "url 6: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWFE/2023-05-27-1459-CWFE-AUTO-minute-swob.xml\n", "station: ELK ISLAND NAT PARK, tc_id: WFE, lat: 53.682619, long: -112.868105, air_temp: 15.6\n", "url 7: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWQC/2023-05-27-1459-CWQC-AUTO-minute-swob.xml\n", "station: PORT ALBERNI AIRPORT (AUT), tc_id: WQC, lat: 49.316698, long: -124.926912, air_temp: 15.1\n", "url 8: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CVBB/2023-05-27-1459-CVBB-AUTO-minute-swob.xml\n", "station: DELTA BURNS BOG, tc_id: VBB, lat: 49.125992, long: -123.002436, air_temp: 14.9\n", "url 9: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWXA/2023-05-27-1459-CWXA-AUTO-minute-swob.xml\n", "station: BOW VALLEY, tc_id: WXA, lat: 51.0831, long: -115.066, air_temp: 13.3\n", "url 10: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWJL/2023-05-27-1459-CWJL-AUTO-minute-swob.xml\n", "station: FORT LIARD, tc_id: WJL, lat: 60.235775, long: -123.472672, air_temp: 14.1\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2023-05-27 11:00:23,960 [INFO] sarracenia.moth.amqp getCleanUp deleteing queue q_anonymous_fractal_SomethingHelpfulToYou\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "obtained 10 product temperatures\n" ] } ], "source": [ "import urllib.request\n", "import xml.etree.ElementTree as ET\n", "\n", "\n", "options['bindings'] = [('xpublic', [ 'v02', 'post'], \\\n", " [ '*', 'WXO-DD', 'observations', 'swob-ml', '#'] )]\n", "\n", "h = sarracenia.moth.Moth.subFactory(options)\n", "\n", "count=0\n", "\n", "while count < 10:\n", " messages = h.newMessages() #get all received Messages, upto options['batch'] of them at a time.\n", " for m in messages:\n", " dataUrl = m['baseUrl']\n", " if 'retPath' in m:\n", " dataUrl += m['retPath']\n", " else:\n", " dataUrl += m['relPath']\n", "\n", " print(\"url %d: %s\" % (count,dataUrl) )\n", " with urllib.request.urlopen( dataUrl ) as f:\n", " vxml = f.read().decode('utf-8')\n", " xmlData = ET.fromstring(vxml)\n", "\n", " stn_name=''\n", " tc_id=''\n", " lat=''\n", " lon=''\n", " air_temp=''\n", "\n", " for i in xmlData.iter():\n", " name = i.get('name')\n", " if name == 'stn_nam' :\n", " stn_name= i.get('value')\n", " elif name == 'tc_id' :\n", " tc_id = i.get('value')\n", " elif name == 'lat' :\n", " lat = i.get('value')\n", " elif name == 'long' :\n", " lon = i.get('value')\n", " elif name == 'air_temp' :\n", " air_temp = i.get('value')\n", "\n", " print( 'station: %s, tc_id: %s, lat: %s, long: %s, air_temp: %s' %\n", " ( stn_name, tc_id, lat, lon, air_temp ))\n", " h.ack(m)\n", " count += 1\n", " if count > 10:\n", " break\n", " time.sleep(1)\n", "\n", "h.cleanup() # remove server-side queue defined by Factory.\n", "h.close()\n", "print(\"obtained 10 product temperatures\")\n" ] }, { "cell_type": "markdown", "id": "neither-radius", "metadata": {}, "source": [ "## Downloading Data with Python\n", "\n", "You can use the urllib python library to download data, and then parse it.\n", "In this example, the data is an XML structure per message downloaded and read into memory.\n", "Some station data is then printed.\n", "\n", "This works well with urllib for hyper-test transport protocol resources, but other resources may be announced using other protocols, such as sftp, or ftp. The python code will need to be expanded to deal\n", "with other protocols, as well as error conditions, such as temporary failures.\n" ] }, { "cell_type": "markdown", "id": "blank-emphasis", "metadata": {}, "source": [ "## Conclusion\n", "\n", "[Sarracenia.moth.amqp](../Reference/code.rst#module-sarracenia.moth) is the lightest-weight way to add consumption of Sarracenia messages to your existing python stack. You explicitly ask for new messages when ready to use them. \n", "\n", "Things this type of integration does not provide:\n", "\n", "* data retrieval: you need your own code to download the corresponding data, \n", "\n", "* error recovery: if there are transient errors, then you need to build error recovery code (for recovering partial downloads.)\n", "\n", "* async/event/data driven: a way to say \"do this every time you get a file\" ... define callbacks to be run when a particular event happens, rather than the sequential flow shown above.\n", "\n", "The sarracenia.flow class, provides downloads, error recovery, and an asynchronous API using the sarracenia.flowcb (flowCallback) class.\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "senior-dressing", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" } }, "nbformat": 4, "nbformat_minor": 5 }