A first Example using Sarracenia Moth API
Sarracenia is a package built to announce the availability of new data, usually as files. We put files on standard servers, making them available via web or sftp, and tell users that they have arrived using messages.
Sarracenia uses existing standard message passing protocols, like rabbitmq/AMQP to transport the messages, and in message passing circles, as server that distributes messages is called a broker.
We call the combination of a message broker, and a file server (which can be a single server, or a large cluster) a data pump.
Assuming you have installed the metpx-sr3 package, either as a debian package, or via pip, One way access announcements to use with sarracenia.moth (Messages Organized by Topic Headers) class, which allows a python program to connect to a Sarracenia server, and start receiving messages that announce resources.
The factory to build sarracenia.moth objects requires a dictionary of settings as an argument:
options: a dictionary of other settings the class might use.
‘broker’: an object (Credential) containing a url pointing to the message server that is announcing products, and other related options.
The example below builds a call to an broker anyone can access, and just request 5 announcements.
You can run it, and then we can discuss a few settings:
[4]:
import sarracenia.moth
import sarracenia.moth.amqp
import sarracenia.credentials
import time
import socket
options = sarracenia.moth.default_options
options.update(sarracenia.moth.amqp.default_options)
options['broker'] = sarracenia.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')
options['topicPrefix'] = [ 'v02', 'post' ]
options['bindings'] = [('xpublic', ['v02', 'post'], ['#'])]
options['queueName'] = 'q_anonymous_' + socket.getfqdn() + '_SomethingHelpfulToYou'
print('options: %s' % options)
options: {'acceptUnmatched': True, 'batch': 25, 'bindings': [('xpublic', ['v02', 'post'], ['#'])], 'broker': <sarracenia.credentials.Credential object at 0x7fd5fa4000a0>, 'dry_run': False, 'exchange': None, 'expire': None, 'inline': False, 'inlineEncoding': 'guess', 'inlineByteMax': 4096, 'logFormat': '%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s', 'logLevel': 'info', 'messageDebugDump': False, 'message_strategy': {'reset': True, 'stubborn': True, 'failure_duration': '5m'}, 'message_ttl': 0, 'topicPrefix': ['v02', 'post'], 'tlsRigour': 'normal', 'auto_delete': False, 'durable': True, 'exchangeDeclare': True, 'prefetch': 25, 'queueName': 'q_anonymous_fractal_SomethingHelpfulToYou', 'queueBind': True, 'queueDeclare': True, 'reset': False, 'subtopic': [], 'vhost': '/'}
The broker setting is an object containing a conventional URL and other options, indicating the messaging protocol to be used to access the upstream server. When you connect to a broker, you need to tell it what messages you are interested in. In Moth, all the brokers we are accessing are expected to use topic hierarchies. You can see them if you successfully ran the example above, there should be in the message print outs a “topic” element in dictionaries. Here is an example of one:
v02.post.20210213.WXO-DD.observations.swob-ml.20210213.CTZR
This divides into two parts:
topic_prefix: v02.post
the rest of the topic tree is a reflection of the path to the announced product, relative to a base directory.
In AMQP, there is the concept of “exchanges” which are sort of analogous to television channels… they are groupings of announcements. so to connect to an AMQP broker, one needs to specify:
exchange: Sarracenia promulgates xpublic as a conventional default.
topic_prefix: decide which version of messages you want to obtain. This server is producing v02 ones.
subtopic: what subset of topic_prefix messages do we want to subscribe to.
Bindings
The bindings option sets out the three values above. in the example, The bindings are:
topic_prefix: v02.post (get v02 messages.)
exchange: xpublic (the default one.)
subtopic: # ( an AMQP wildcard meaning everything. )
we connect to the
amqp://hpfx.collab.science.gc.ca broker, on the xpublic exchange, and the we will be interested in all messages matching the v02.post.# topic specification… (which is all v02 messages available.)
subtopic
The subtopic here ( # ) is matches everything produced on the server. The wider the subtopic, the more messages have to be sent, and the more processing done. It is better to make it narrower. Taking the example above, if we are interested in swob, a subtopic like:
*.WXO-DD.observations.swob-ml.#
would match all of the swobs similar to the one above, but avoid sending messages for non-swobs to you.
queueName
By convention in brokers administered by Sarracenia, users can only create queues that start with q_ followed by their user name. we connected as anonymous, and so q_anonymous must be used. After that, the rest can be whatever you want, but there are a few considerations:
If you want to start up multiple python processes to share a data feed, they all specify the same queue_name, and they will share the flow of messages. It scales well for a few dozen co-operating downloaders, but does not scale infinitely, do not expect more than 99 or so processes to be able to effectively share a load from a single queue. To scale beyond that with AMQP, multiple selections are better.
if you are going to ask for help from the data pump admins… you are going to need to supply them the name of the queue, and they may need to be able to pick it out of hundreds or thousands that are on the server.
Messages
Different messaging protocols have different storage structures and conventions. the MoTH class returns messages as python dictionaries regardless of what protocol is used to obtain them or, if forwarding them, to transmit them. One can add fields for programmatic use to the messages just by adding elements to the dictionary. If they are only for internal use, then add the name of the dictionary element to the special ‘_deleteOnPost’ key, so that the dictionary element will be dropped when forwarding the message.
Ack
Messages are marked in transit by the broker, and if you do not acknowledge them, the data pump will hold onto them, and eventually re-dispatch them. keeping pending messages in memory will also slow down processing of all messages. One should acknowledge receipt of messages as soon as practicable, but not so soon that you will lose data if the the program is interrupted. In the example, we acknowledge after we have done our work of printing the message.
[6]:
h = sarracenia.moth.Moth.subFactory(options)
count=0
good=0
while count < 5:
m = h.getNewMessage() #get only one Message
if m is not None:
print("message %d: %s" % (count,m) )
content = m.getContent()
print("first 50 bytes of corresponding file: %s" % content[0:50])
good +=1
h.ack(m)
time.sleep(0.1)
count += 1
h.cleanup() # remove server-side queue defined by Factory.
h.close()
print( f"obtained {good} product announcements")
2023-05-27 10:55:22,559 [INFO] sarracenia.moth.amqp __getSetup queue declared q_anonymous_fractal_SomethingHelpfulToYou (as: amqps://anonymous@hpfx.collab.science.gc.ca)
2023-05-27 10:55:22,560 [INFO] sarracenia.moth.amqp __getSetup binding q_anonymous_fractal_SomethingHelpfulToYou with v02.post.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca)
message 0: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145518', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_34df392aeffc9c678011f3fd30193bb6:CMC:REGIONAL:GRIB2:BIN::20230527145518', 'source': 'WXO-DD', 'mtime': '20230527T145520.791', 'atime': '20230527T145520.791', 'pubTime': '20230527T145520.791', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_WIND_ISBL_30_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'U1vVZnatrCeK3bLrXshb2g=='}, 'size': 554100, 'exchange': 'xpublic', 'ack_id': 1, 'local_offset': 0}
first 50 bytes of corresponding file: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x08tt\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1b\x0c\x00\x00\x00\x02\x00\x00\x00A\x03\x00\x00\x0b\xc1\x88\x00\x00\x00'
message 1: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145519', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_752eb4e8803503704990563d84030e67:CMC:REGIONAL:GRIB2:BIN::20230527145519', 'source': 'WXO-DD', 'mtime': '20230527T145520.292', 'atime': '20230527T145520.292', 'pubTime': '20230527T145520.292', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_HGT_ISBL_250_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'j6bh9dbE4QbJAXEOejw0Tw=='}, 'size': 377005, 'exchange': 'xpublic', 'ack_id': 2, 'local_offset': 0}
first 50 bytes of corresponding file: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x05\xc0\xad\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1b\x0c\x00\x00\x00\x02\x00\x00\x00A\x03\x00\x00\x0b\xc1\x88\x00\x00\x00'
message 2: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145519', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_55f121bb28e822cffb6e61196cd924eb:CMC:REGIONAL:GRIB2:BIN::20230527145519', 'source': 'WXO-DD', 'mtime': '20230527T145521.260', 'atime': '20230527T145521.260', 'pubTime': '20230527T145521.260', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_RH_ISBL_700_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'V7goy/doL6Gle68s1zoVEA=='}, 'size': 808438, 'exchange': 'xpublic', 'ack_id': 3, 'local_offset': 0}
first 50 bytes of corresponding file: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x0cU\xf6\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1b\x0c\x00\x00\x00\x02\x00\x00\x00A\x03\x00\x00\x0b\xc1\x88\x00\x00\x00'
message 3: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145518', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_dac300cf33756ba816e030f99fc9dc22:CMC:REGIONAL:GRIB2:BIN::20230527145518', 'source': 'WXO-DD', 'mtime': '20230527T145519.586', 'atime': '20230527T145519.586', 'pubTime': '20230527T145519.586', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_UGRD_ISBL_225_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'MI8XzT1uam5OUf7QlDZ4FA=='}, 'size': 487411, 'exchange': 'xpublic', 'ack_id': 4, 'local_offset': 0}
first 50 bytes of corresponding file: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x07o\xf3\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1b\x0c\x00\x00\x00\x02\x00\x00\x00A\x03\x00\x00\x0b\xc1\x88\x00\x00\x00'
message 4: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'exchange', 'ack_id', 'local_offset', '_format'}, 'sundew_extension': 'CMC:REGIONAL:GRIB2:BIN::20230527145519', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_c5e84748169e0a6dce8f3b884ffdf059:CMC:REGIONAL:GRIB2:BIN::20230527145519', 'source': 'WXO-DD', 'mtime': '20230527T145520.651', 'atime': '20230527T145520.651', 'pubTime': '20230527T145520.651', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230527/WXO-DD/model_gem_regional/10km/grib2/12/037/CMC_reg_RH_ISBL_550_ps10km_2023052712_P037.grib2', 'subtopic': ['20230527', 'WXO-DD', 'model_gem_regional', '10km', 'grib2', '12', '037'], 'identity': {'method': 'md5', 'value': 'zukdtksA5I0C5oq/ieiXbQ=='}, 'size': 774394, 'exchange': 'xpublic', 'ack_id': 5, 'local_offset': 0}
2023-05-27 10:55:24,022 [INFO] sarracenia.moth.amqp getCleanUp deleteing queue q_anonymous_fractal_SomethingHelpfulToYou
first 50 bytes of corresponding file: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x0b\xd0\xfa\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1b\x0c\x00\x00\x00\x02\x00\x00\x00A\x03\x00\x00\x0b\xc1\x88\x00\x00\x00'
obtained 5 product announcements
2nd example … combine baseURL + relPath (talk about retPath) and retrieve data… use newMessages() instead of getNewMessage to show alternate consumption ui. talk about http, and how retrieval will vary depending on the protocol listed in the baseUrl, and can get complicated.
[7]:
import urllib.request
import xml.etree.ElementTree as ET
options['bindings'] = [('xpublic', [ 'v02', 'post'], \
[ '*', 'WXO-DD', 'observations', 'swob-ml', '#'] )]
h = sarracenia.moth.Moth.subFactory(options)
count=0
while count < 10:
messages = h.newMessages() #get all received Messages, upto options['batch'] of them at a time.
for m in messages:
dataUrl = m['baseUrl']
if 'retPath' in m:
dataUrl += m['retPath']
else:
dataUrl += m['relPath']
print("url %d: %s" % (count,dataUrl) )
with urllib.request.urlopen( dataUrl ) as f:
vxml = f.read().decode('utf-8')
xmlData = ET.fromstring(vxml)
stn_name=''
tc_id=''
lat=''
lon=''
air_temp=''
for i in xmlData.iter():
name = i.get('name')
if name == 'stn_nam' :
stn_name= i.get('value')
elif name == 'tc_id' :
tc_id = i.get('value')
elif name == 'lat' :
lat = i.get('value')
elif name == 'long' :
lon = i.get('value')
elif name == 'air_temp' :
air_temp = i.get('value')
print( 'station: %s, tc_id: %s, lat: %s, long: %s, air_temp: %s' %
( stn_name, tc_id, lat, lon, air_temp ))
h.ack(m)
count += 1
if count > 10:
break
time.sleep(1)
h.cleanup() # remove server-side queue defined by Factory.
h.close()
print("obtained 10 product temperatures")
2023-05-27 11:00:20,655 [INFO] sarracenia.moth.amqp __getSetup queue declared q_anonymous_fractal_SomethingHelpfulToYou (as: amqps://anonymous@hpfx.collab.science.gc.ca)
2023-05-27 11:00:20,656 [INFO] sarracenia.moth.amqp __getSetup binding q_anonymous_fractal_SomethingHelpfulToYou with v02.post.*.WXO-DD.observations.swob-ml.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca)
url 0: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CMTH/2023-05-27-1459-CMTH-AUTO-minute-swob.xml
station: THETFORD MINES RCS, tc_id: MTH, lat: 46.049134, long: -71.266448, air_temp: 17.3
url 1: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWPZ/2023-05-27-1459-CWPZ-AUTO-minute-swob.xml
station: BURNS LAKE DECKER LAKE, tc_id: WPZ, lat: 54.383092, long: -125.95879, air_temp: 7.2
url 2: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWRK/2023-05-27-1459-CWRK-AUTO-minute-swob.xml
station: BANCROFT AUTO, tc_id: WRK, lat: 45.071498, long: -77.879936, air_temp: 22.6
url 3: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWSV/2023-05-27-1459-CWSV-AUTO-minute-swob.xml
station: BLUE RIVER CS, tc_id: WSV, lat: 52.128917, long: -119.289848, air_temp: 14.7
url 4: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWSL/2023-05-27-1459-CWSL-AUTO-minute-swob.xml
station: SALMON ARM CS, tc_id: WSL, lat: 50.703, long: -119.290677, air_temp: 15.2
url 5: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CPRJ/2023-05-27-1459-CPRJ-AUTO-minute-swob.xml
station: YORKTON RCS, tc_id: PRJ, lat: 51.260003, long: -102.461318, air_temp: 14.1
url 6: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWFE/2023-05-27-1459-CWFE-AUTO-minute-swob.xml
station: ELK ISLAND NAT PARK, tc_id: WFE, lat: 53.682619, long: -112.868105, air_temp: 15.6
url 7: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWQC/2023-05-27-1459-CWQC-AUTO-minute-swob.xml
station: PORT ALBERNI AIRPORT (AUT), tc_id: WQC, lat: 49.316698, long: -124.926912, air_temp: 15.1
url 8: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CVBB/2023-05-27-1459-CVBB-AUTO-minute-swob.xml
station: DELTA BURNS BOG, tc_id: VBB, lat: 49.125992, long: -123.002436, air_temp: 14.9
url 9: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWXA/2023-05-27-1459-CWXA-AUTO-minute-swob.xml
station: BOW VALLEY, tc_id: WXA, lat: 51.0831, long: -115.066, air_temp: 13.3
url 10: https://hpfx.collab.science.gc.ca/20230527/WXO-DD/observations/swob-ml/20230527/CWJL/2023-05-27-1459-CWJL-AUTO-minute-swob.xml
station: FORT LIARD, tc_id: WJL, lat: 60.235775, long: -123.472672, air_temp: 14.1
2023-05-27 11:00:23,960 [INFO] sarracenia.moth.amqp getCleanUp deleteing queue q_anonymous_fractal_SomethingHelpfulToYou
obtained 10 product temperatures
Downloading Data with Python
You can use the urllib python library to download data, and then parse it. In this example, the data is an XML structure per message downloaded and read into memory. Some station data is then printed.
This works well with urllib for hyper-test transport protocol resources, but other resources may be announced using other protocols, such as sftp, or ftp. The python code will need to be expanded to deal with other protocols, as well as error conditions, such as temporary failures.
Conclusion
Sarracenia.moth.amqp is the lightest-weight way to add consumption of Sarracenia messages to your existing python stack. You explicitly ask for new messages when ready to use them.
Things this type of integration does not provide:
data retrieval: you need your own code to download the corresponding data,
error recovery: if there are transient errors, then you need to build error recovery code (for recovering partial downloads.)
async/event/data driven: a way to say “do this every time you get a file” … define callbacks to be run when a particular event happens, rather than the sequential flow shown above.
The sarracenia.flow class, provides downloads, error recovery, and an asynchronous API using the sarracenia.flowcb (flowCallback) class.
[ ]: