STOMP
If you find that this library lacks some feature you need, or you have a suggestion for improving it, please don’t hesitate to get in touch with me!
1 Introduction
This library implements the STOMP protocol, both the codec for STOMP frames and the session protocol used by clients.
The STOMP specification includes a lot of important information that’s needed to make sense of the definitions below, including such things as
what a "destination" is
how to correctly acknowledge messages received from the server
how to arrange for acknowledgements-of-receipt from the server
how to manage transactions
1.1 Destinations
When using RabbitMQ with its STOMP plugin as your STOMP server, the available destinations are
(string-append "/queue/" queue-name) —
When used with stomp-send, delivers a message directly to the named queue. When used with stomp-subscribe, consumes from the named queue (at which point it’s interesting to think about which ack-mode you want to use). (string-append "/exchange/" exchange-name "/" routing-key) —
When used with stomp-send, delivers a message via the named exchange, using the given routing-key. When used with stomp-subscribe, creates an anonymous queue, binds it to the named exchange using routing-key as a binding pattern, and starts consuming from the anonymous queue.
ActiveMQ and other STOMP message brokers have different destination name schemata and routing behaviours.
2 Changes
Version 3.0 of this library changes the interface to many of the main API procedures, generally relying more on keyword arguments and less on positional arguments.
3 Examples
In the examples below, I’ll make use of the RabbitMQ demonstration broker service.
3.1 Sending a message
(require (planet tonyg/stomp)) (define s (stomp-connect "dev.rabbitmq.com" #:login "guest" #:passcode "guest" #:virtual-host "/")) (stomp-send s "/exchange/amq.rabbitmq.log/info" (string->bytes/utf-8 "Hello world, from Racket!")) (stomp-disconnect s)
3.2 Sending a message, with a receipt
(require (planet tonyg/stomp)) (define s (stomp-connect "dev.rabbitmq.com" #:login "guest" #:passcode "guest" #:virtual-host "/")) (call-with-receipt s (lambda (receipt) (stomp-send s "/exchange/amq.rabbitmq.log/info" (string->bytes/utf-8 "Hello world, from Racket!") #:headers `((receipt ,receipt))))) ; At this point, we know the server has received our ; SEND, because we have received its RECEIPT frame. See ; the STOMP specification for details of what exact ; implications this has for whether the server has ; processed the SEND or not. (stomp-disconnect s)
3.3 Subscribing to an exchange
This example uses RabbitMQ’s AMQP "wildcards" to subscribe to all messages travelling through the "amq.rabbitmq.log" exchange.
(require (planet tonyg/stomp)) (define s (stomp-connect "dev.rabbitmq.com" #:login "guest" #:passcode "guest" #:virtual-host "/")) (stomp-subscribe s "/exchange/amq.rabbitmq.log/#" "my-subscription") (let loop () (let ((m (stomp-next-message s "my-subscription"))) (pretty-print m) (loop)))
4 API
(require (planet tonyg/stomp:3:=0)) | package: base |
struct
(struct stomp-frame (command headers body) #:transparent) command : string? headers : (listof (list symbol? string?)) body : (or bytes? #f)
command —
The STOMP command part of the frame. For frames received from the server, this will usually be "MESSAGE". Frames sent to the server are usually constructed using the procedures described below (in particular stomp-send). headers —
The STOMP headers sent or received with the frame. See the STOMP specification for details of the meaning of various headers. body —
The body sent or received with the frame. This is entirely application-specific: STOMP makes no restrictions on the length or format of the body part of a frame. Note that it is a byte vector, however: make sure to use string->bytes/utf-8 and bytes->string/utf-8 as appropriate.
procedure
(stomp-frame-header frame header [ default-value]) → any? frame : stomp-frame? header : symbol? default-value : any? = #f
procedure
(stomp-connect hostname [ #:login login #:passcode passcode #:virtual-host virtual-host #:port-number port-number #:headers headers #:request-versions request-versions]) → stomp-session? hostname : string? login : (or string? #f) = #f passcode : (or string? #f) = #f virtual-host : string? = hostname
port-number :
(and/c exact-nonnegative-integer? (integer-in 0 65535)) = 61613 headers : (listof (list symbol? string?)) = '() request-versions : (listof string?) = '("1.1")
> (stomp-connect "dev.rabbitmq.com" #:login "guest" #:passcode "guest" #:virtual-host "/")
Any headers given in headers are included in the CONNECT frame.
The optional request-versions argument should be a list of strings indicating the STOMP protocol versions to ask for when negotiating with the server. If omitted, it defaults to asking for STOMP version 1.1. The server will choose a protocol variant that it supports from this list. You can check the version that the server chose using stomp-session-version. If the server supports none of the requested versions, it should fall back to STOMP version 1.0. To force the use of STOMP version 1.0, pass in '() as request-versions.
procedure
(stomp-disconnect session [ #:headers headers]) → void? session : stomp-session? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-disconnect/abrupt session) → void?
session : stomp-session?
procedure
(stomp-flush session) → void?
session : stomp-session?
The following example demonstrates the use of a receipt request with a SEND operation:
> (call-with-receipt session (lambda (receipt) (stomp-send session "/queue/a" (string->bytes/utf-8 "some message body") #:headers `((receipt ,receipt)))))
procedure
(stomp-send-command session command [ #:headers headers #:body body #:use-content-length use-content-length]) → void? session : stomp-session? command : string? headers : (listof (list symbol? string?)) = '() body : (or bytes? #f) = #f use-content-length : (or 'default 'always 'never) = 'default
Note that this is a low-level way of sending commands to the server: better to use stomp-send, stomp-subscribe, stomp-ack-message etc.
If you are working with a STOMP server such as ActiveMQ that interprets the presence or absence of a content-length header as an indicator of the content type, you can use the use-content-length parameter to override the default behaviour. Setting it to 'default causes the header to be generated whenever body is non-empty. Setting it to 'always causes it to always be generated, and setting it to 'never causes it to never be generated. You should omit use-content-length unless you are certain you need to work with a broken server.
procedure
→ (or stomp-frame? eof-object? #f) session : stomp-session? block? : boolean? = #t
procedure
(stomp-next-frame/filter session predicate [ block?]) → (or stomp-frame? eof-object? #f) session : stomp-session? predicate : (-> stomp-frame? boolean?) block? : boolean? = #t
procedure
(stomp-next-message session subscription-id [ block?]) → (or stomp-frame? eof-object? #f) session : stomp-session? subscription-id : (or string? #f) block? : boolean? = #t
procedure
(stomp-send session destination body [ #:headers headers #:use-content-length use-content-length]) → void? session : stomp-session? destination : string? body : (or bytes? #f) headers : (listof (list symbol? string?)) = '() use-content-length : (or 'default 'always 'never) = 'default
This is the procedure you will want to use to actually publish messages to the STOMP server.
In some cases, you may need to use the use-content-length parameter. See stomp-send-command for details. You should omit this parameter unless you are certain that you need to set it.
procedure
(stomp-send/flush session destination body [ #:headers headers]) → void? session : stomp-session? destination : string? body : (or bytes? #f) headers : (listof (list symbol? string?)) = '()
procedure
(stomp-subscribe session destination subscription-id #:ack-mode ack-mode [ #:headers headers]) → void? session : stomp-session? destination : string? subscription-id : (or string? #f) ack-mode : (or 'auto 'client 'client-individual) headers : (listof (list symbol? string?)) = '()
'auto —
The server will not expect any ACK frames in response to MESSAGEs it sends. 'client —
The server will expect ACK frames, and will interpret an acknowledgement of message ID m to mean that message and all preceding messages. 'client-individual —
The server will expect ACK frames, but will interpret each such frame as acknowledging only the message ID mentioned within it.
Proceeds without waiting for a reply. To wait for a reply, supply a receipt header; see call-with-receipt.
If subscription-id is #f, then no subscription identifier will be associated with this subscription, and you may have difficulty telling which subscription any resulting MESSAGE frames relate to. You may also have difficulty cancelling such subscriptions since you will have to use an (ambiguous) destination, instead of an unambiguous subscription identifier.
procedure
(stomp-unsubscribe session subscription-id [ #:destination destination #:headers headers]) → void? session : stomp-session? subscription-id : (or string? #f) destination : (or string? #f) = #f headers : (listof (list symbol? string?)) = '()
If subscription-id is #f, then destination must be non-#f, and the cancellation request will operate using the possibly-ambiguous notion of destination instead of the unambiguous notion of a subscription identifier. Servers may differ in how they treat this situation. It is always safe to use a unique per-subscription identifier when subscribing and unsubscribing.
procedure
(stomp-ack session subscription-id message-id [ #:headers headers]) → void? session : stomp-session? subscription-id : (or string? #f) message-id : string? headers : (listof (list symbol? string?)) = '()
Use this procedure or stomp-ack-message to acknowledge messages received via a call to stomp-subscribe where ack-mode was either 'client or 'client-individual.
procedure
(stomp-ack-message session message [ #:headers headers]) → void? session : stomp-session? message : stomp-frame? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-nack session subscription-id message-id [ #:headers headers]) → void? session : stomp-session? subscription-id : (or string? #f) message-id : string? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-begin session transaction [ #:headers headers]) → void? session : stomp-session? transaction : string? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-commit session transaction [ #:headers headers]) → void? session : stomp-session? transaction : string? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-abort session transaction [ #:headers headers]) → void? session : stomp-session? transaction : string? headers : (listof (list symbol? string?)) = '()
Start, commit, or abort a transaction, respectively. Transaction names are managed by the client. See the STOMP specification for BEGIN, COMMIT and ABORT for more information on how transaction names are used.
The procedure call-with-stomp-transaction abstracts away from some of the detail of managing transactions for you.
struct
(struct stomp-session (input output id server-info version buffer) #:transparent) input : input-port? output : output-port? id : (or string? #f) server-info : (or string? #f) version : string? buffer : queue?