Splunk Cloud Platform

Developing Views and Apps for Splunk Web

Modular inputs examples

These examples use Python for the scripting language. However, you can use various other scripting languages to implement modular inputs.

Note: Splunk Universal Forwarder, unlike other Splunk instances, does not provide a Python interpreter. In this case, to run these examples you may need to install Python on the server if one is not already available.


Twitter example

The Twitter example streams JSON data from a Twitter source to the Splunk platform for indexing.

Note: The example uses Tweepy, a Python library, to access the Twitter source. Tweepy libraries must be available to the Splunk Twitter example script, twitter.py. To run the example, download and install Tweepy.


Twitter example script

Place the twitter.py script in the following location in your Splunk installation:

$SPLUNK_HOME/etc/apps/twitter/bin/twitter.py

Refer to Scripts for modular inputs for analysis of specific parts of the script. This script has been made cross-compatible with Python 2 and Python 3 using python-future.

twitter.py

from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import str
import tweepy, sys
import xml.dom.minidom, xml.sax.saxutils
from tweepy.utils import import_simplejson
json = import_simplejson()
from tweepy.models import Status
import logging
import splunk.entity as entity

import http.client
from socket import timeout
from tweepy.auth import BasicAuthHandler
from tweepy.api import API

#set up logging suitable for splunkd comsumption
logging.root
logging.root.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logging.root.addHandler(handler)

SCHEME = """<scheme>
    <title>Twitter</title>
    <description>Get data from Twitter.</description>
    <use_external_validation>true</use_external_validation>
    <streaming_mode>simple</streaming_mode>
    <endpoint>
        <args>
            <arg name="name">
                <title>Twitter feed name</title>
                <description>Name of the current feed using the user credentials supplied.</description>
            </arg>

            <arg name="username">
                <title>Twitter ID/Handle</title>
                <description>Your Twitter ID.</description>
            </arg>
            <arg name="password">
                <title>Password</title>
                <description>Your twitter password</description>
            </arg>
        </args>
    </endpoint>
</scheme>
"""

def do_scheme():
	print(SCHEME)

# prints XML error data to be consumed by Splunk
def print_error(s):
    print("<error><message>%s</message></error>" % xml.sax.saxutils.escape(s))

class SplunkListener( tweepy.StreamListener ):

        def on_data(self, data):
            super( SplunkListener, self).on_data( data )
            twt = json.loads(data)
            if 'text' in twt:
               print(json.dumps(twt))
            return True

        def on_error(self, status_code):
                """Called when a non-200 status code is returned"""
                print('got error\n')
                print(status_code)
		logging.error("got error: %s" %(status_code))
                return False

        def on_timeout(self):
                """Called when stream connection times out"""
                print('got timeout')
		logging.info("Got a timeout")
                return

def validate_conf(config, key):
    if key not in config:
        raise Exception("Invalid configuration received from Splunk: key '%s' is missing." % key)

#read XML configuration passed from splunkd
def get_config():
    config = {}

    try:
        # read everything from stdin
        config_str = sys.stdin.read()

        # parse the config XML
        doc = xml.dom.minidom.parseString(config_str)
        root = doc.documentElement
        conf_node = root.getElementsByTagName("configuration")[0]
        if conf_node:
            logging.debug("XML: found configuration")
            stanza = conf_node.getElementsByTagName("stanza")[0]
            if stanza:
                stanza_name = stanza.getAttribute("name")
                if stanza_name:
                    logging.debug("XML: found stanza " + stanza_name)
                    config["name"] = stanza_name

                    params = stanza.getElementsByTagName("param")
                    for param in params:
                        param_name = param.getAttribute("name")
                        logging.debug("XML: found param '%s'" % param_name)
                        if param_name and param.firstChild and \
                           param.firstChild.nodeType == param.firstChild.TEXT_NODE:
                            data = param.firstChild.data
                            config[param_name] = data
                            logging.debug("XML: '%s' -> '%s'" % (param_name, data))

        checkpnt_node = root.getElementsByTagName("checkpoint_dir")[0]
        if checkpnt_node and checkpnt_node.firstChild and \
           checkpnt_node.firstChild.nodeType == checkpnt_node.firstChild.TEXT_NODE:
            config["checkpoint_dir"] = checkpnt_node.firstChild.data

        if not config:
            raise Exception("Invalid configuration received from Splunk.")

        # just some validation: make sure these keys are present (required)
        validate_conf(config, "name")
        validate_conf(config, "username")
        validate_conf(config, "password")
        validate_conf(config, "checkpoint_dir")
    except Exception as e:
        raise Exception("Error getting Splunk configuration via STDIN: %s" % str(e))

    return config

def get_validation_data():
    val_data = {}

    # read everything from stdin
    val_str = sys.stdin.read()

    # parse the validation XML
    doc = xml.dom.minidom.parseString(val_str)
    root = doc.documentElement

    logging.debug("XML: found items")
    item_node = root.getElementsByTagName("item")[0]
    if item_node:
        logging.debug("XML: found item")

        name = item_node.getAttribute("name")
        val_data["stanza"] = name

        params_node = item_node.getElementsByTagName("param")
        for param in params_node:
            name = param.getAttribute("name")
            logging.debug("Found param %s" % name)
            if name and param.firstChild and \
               param.firstChild.nodeType == param.firstChild.TEXT_NODE:
                val_data[name] = param.firstChild.data

    return val_data

# parse the twitter error string and extract the message
def get_twitter_error(s):
    try:
        doc = xml.dom.minidom.parseString(s)
        root = doc.documentElement
        messages = root.getElementsByTagName("Message")
        if messages and messages[0].firstChild and \
           messages[0].firstChild.nodeType == messages[0].firstChild.TEXT_NODE:
            return messages[0].firstChild.data
        return ""
    except xml.parsers.expat.ExpatError as e:
        return s

def validate_config(username,password):
  try:
	auth = BasicAuthHandler(username,password)
	headers = {}
	host = 'stream.twitter.com'
	url = '/1/statuses/sample.json?delimited=length'
	body = None
	timeout = 5.0
	auth.apply_auth(None,None, headers, None)
	conn = http.client.HTTPSConnection(host)
	conn.connect()
	conn.sock.settimeout(timeout)
	conn.request('POST', url, body, headers=headers)
	resp = conn.getresponse()
	if resp.status != 200:
		raise Exception("HTTP request to Twitter returned with status code %d (%s): %s" % (resp.status,resp.reason, get_twitter_error(resp.read())))
		logging.error("Invalid twitter credentials %s , %s" % (username,password))
	conn.close()
  except Exception as e:
	print_error("Invalid configuration specified: %s" % str(e))
	sys.exit(1)	

def run():
	config =get_config()

	username=config["username"]
	password=config["password"]

	# Validate username and password before starting splunk listener.
 	logging.debug("Credentials found: username = %s, password = %s" %(username,password))
	validate_config(username,password)
        
	listener = SplunkListener()
        stream = tweepy.Stream( username,password, listener )
        stream.sample()

if __name__ == '__main__':
    if len(sys.argv) > 1:
        if sys.argv[1] == "--scheme":
            do_scheme()
        elif sys.argv[1] == "--validate-arguments":
            if len(sys.argv)>3:
		validate_config(sys.argv[2],sys.argv[3])
	    else:
		print('supply username and password')
        elif sys.argv[1] == "--test":
            print('No tests for the scheme present')
        else:
            print('You giveth weird arguments')
    else:
        # just request data from Twitter
        run()

    sys.exit(0)

Twitter example spec file

Place the following spec file in the following location:

$SPLUNK_HOME/etc/apps/twitter/README/inputs.conf.spec

inputs.conf.spec

[twitter://default]
*This is how the Twitter app is configured

username = <value>
*This is the user's twitter username/handle

password = <value>
*This is the user's password used for logging into twitter

Sample JSON input for the Twitter example

Here is an example of the JSON input from Twitter that the Twitter example indexes:

{"contributors":null,"text":"@CraZiiBoSSx3 Yea ... Lo_Ok  http://twitpic.com/19ksg2","created_at":"Fri Mar 19 18:41:17 +0000 2010","truncated":false,"coordinates":null,"in_reply_to_screen_name":"CraZiiBoSSx3","favorited":false,"geo":null,"in_reply_to_status_id":10735405186,"source":"<a href=\"http://echofon.com/\" rel=\"nofollow\">Echofon</a>","place":null,"in_reply_to_user_id":114199314,"user":{"created_at":"Mon Dec 21 23:01:05 +0000 2009","profile_background_color":"0099B9","favourites_count":0,"lang":"en","profile_text_color":"3C3940","location":"my location !","following":null,"time_zone":"Central Time (US & Canada)","description":"Names GiiqqL3z; ;) ; Unfollow","statuses_count":1685,"profile_link_color":"0099B9","notifications":null,"profile_background_image_url":"http://s.twimg.com/a/1268437273/images/themes/theme4/bg.gif","contributors_enabled":false,"geo_enabled":false,"profile_sidebar_fill_color":"95E8EC","url":null,"profile_image_url":"http://a3.twimg.com/profile_images/703836981/123_Kay_normal.jpg","profile_background_tile":false,"protected":false,"profile_sidebar_border_color":"5ED4DC","screen_name":"123_Kay","name":"~GLam DOll GiiqqLez~","verified":false,"followers_count":77,"id":98491606,"utc_offset":-21600,"friends_count":64},"id":10735704604}


S3 example

The S3 example provides for streaming data from the Amazon S3 data storage service. A more robust version of this capability is available in the Splunk Add-on for Amazon Web Services on Splunkbase.


Place the s3.py script in the following location in your Splunk installation:

$SPLUNK_HOME/etc/apps/s3/bin
Note: A script for modular inputs requires an inputs.conf spec file to operate correctly in Splunk Web. Refer to Modular Inputs spec file for information on creating the inputs.conf spec file.

s3.py reads files in various formats and streams data from the files for indexing by Splunk software. Specific areas of interest for modular inputs are the following:

  • Connects to S3 services, providing an Access Key ID and a Secret Access Key
  • Sets up logging to splunkd.log
  • Provides an XML scheme for use by Splunk Manager
  • Provides a --scheme argument that returns the XML scheme for the modular inputs
  • Validates data returned from S3
  • Specifies streaming mode as xml

S3 example script

The following example script, s3.py, provides for streaming data from the Amazon S3 data storage service. s3.py is presented in its entirety below. Refer to Scripts for modular inputs for analysis of specific parts of the script. This script has been made cross-compatible with Python 2 and Python 3 using python-future.

s3.py

from __future__ import division
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import range
from past.utils import old_div
from builtins import object
import sys, time, os
import http.client, urllib.request, urllib.parse, urllib.error, hashlib, base64, hmac, urllib.parse, md5
import xml.dom.minidom, xml.sax.saxutils
import logging
import tarfile, gzip

ENDPOINT_HOST_PORT = "s3.amazonaws.com"

# set up logging suitable for splunkd consumption
logging.root
logging.root.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logging.root.addHandler(handler)

SCHEME = """<scheme>
    <title>Amazon S3</title>
    <description>Get data from Amazon S3.</description>
    <use_external_validation>true</use_external_validation>
    <streaming_mode>xml</streaming_mode>

    <endpoint>
        <args>
            <arg name="name">
                <title>Resource name</title>
                <description>An S3 resource name without the leading s3://.  
                   For example, for s3://bucket/file.txt specify bucket/file.txt.  
                   You can also monitor a whole bucket (for example by specifying 'bucket'),
                   or files within a sub-directory of a bucket
                   (for example 'bucket/some/directory/'; note the trailing slash).
                </description>
            </arg>

            <arg name="key_id">
                <title>Key ID</title>
                <description>Your Amazon key ID.</description>
            </arg>

            <arg name="secret_key">
                <title>Secret key</title>
                <description>Your Amazon secret key.</description>
            </arg>
        </args>
    </endpoint>
</scheme>
"""

def string_to_sign(method, http_date, resource):
    # "$method\n$contentMD5\n$contentType\n$httpDate\n$xamzHeadersToSign$resource"
    return "%s\n\n\n%s\n%s" % (method, http_date, resource)

# returns "Authorization" header string
def get_auth_header_value(method, key_id, secret_key, http_date, resource):
    to_sign = string_to_sign(method, http_date, resource)
    logging.debug("String to sign=%s" % repr(to_sign))

    signature = base64.encodestring(hmac.new(str(secret_key), to_sign, hashlib.sha1).digest()).strip()

    return "AWS %s:%s" % (key_id, signature)

def put_header(conn, k, v):
    logging.debug("Adding header %s: %s" % (k, v))
    conn.putheader(k, v)

def gen_date_string():
    st = time.localtime()
    tm = time.mktime(st)
    return time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(tm))

# query_string is expected to have been escaped by the caller
def get_http_connection(key_id, secret_key, bucket, obj, use_bucket_as_host = True, query_string = None):
    method = "GET"
    host = bucket + "." + ENDPOINT_HOST_PORT
    if not use_bucket_as_host:
        host = ENDPOINT_HOST_PORT

    conn = http.client.HTTPConnection(host)
    logging.info("Connecting to %s." % host)
    conn.connect()

    unescaped_path_to_sign = "/" + bucket + "/"
    unescaped_path_to_req = "/"
    if obj:
        unescaped_path_to_sign += obj
        unescaped_path_to_req += obj

    if not use_bucket_as_host:
        unescaped_path_to_req = unescaped_path_to_sign

    date_str = gen_date_string()

    path = urllib.parse.quote(unescaped_path_to_req)
    if query_string:
        path += query_string
    logging.debug("%s %s" % (method, path))
    conn.putrequest(method, path)
    put_header(conn, "Authorization", get_auth_header_value(method, key_id, \
        secret_key, date_str, unescaped_path_to_sign))
    put_header(conn, "Date", date_str)
    conn.endheaders()

    return conn

def log_response(resp):
    status, reason = resp.status, resp.reason
    s = "status=%s reason=\"%s\"" % (str(status), str(reason))
    if status == 200:
        logging.debug(s)
    else:
        logging.error(s)

# parse the amazon error string and extract the message
def get_amazon_error(s):
    try:
        doc = xml.dom.minidom.parseString(s)
        root = doc.documentElement
        messages = root.getElementsByTagName("Message")
        if messages and messages[0].firstChild and \
           messages[0].firstChild.nodeType == messages[0].firstChild.TEXT_NODE:
            return messages[0].firstChild.data
        return ""
    except xml.parsers.expat.ExpatError as e:
        return s

# prints XML error data to be consumed by Splunk
def print_error_old(s):
    impl = xml.dom.minidom.getDOMImplementation()
    doc = impl.createDocument(None, "message", None)
    top_element = doc.documentElement
    text = doc.createTextNode(s)
    top_element.appendChild(text)
    sys.stdout.write(doc.toxml())

# prints XML error data to be consumed by Splunk
def print_error(s):
    print("<error><message>%s</message></error>" % xml.sax.saxutils.escape(s))

def validate_conf(config, key):
    if key not in config:
        raise Exception("Invalid configuration received from Splunk: key '%s' is missing." % key)

# read XML configuration passed from splunkd
def get_config():
    config = {}

    try:
        # read everything from stdin
        config_str = sys.stdin.read()

        # parse the config XML
        doc = xml.dom.minidom.parseString(config_str)
        root = doc.documentElement
        conf_node = root.getElementsByTagName("configuration")[0]
        if conf_node:
            logging.debug("XML: found configuration")
            stanza = conf_node.getElementsByTagName("stanza")[0]
            if stanza:
                stanza_name = stanza.getAttribute("name")
                if stanza_name:
                    logging.debug("XML: found stanza " + stanza_name)
                    config["name"] = stanza_name

                    params = stanza.getElementsByTagName("param")
                    for param in params:
                        param_name = param.getAttribute("name")
                        logging.debug("XML: found param '%s'" % param_name)
                        if param_name and param.firstChild and \
                           param.firstChild.nodeType == param.firstChild.TEXT_NODE:
                            data = param.firstChild.data
                            config[param_name] = data
                            logging.debug("XML: '%s' -> '%s'" % (param_name, data))

        checkpnt_node = root.getElementsByTagName("checkpoint_dir")[0]
        if checkpnt_node and checkpnt_node.firstChild and \
           checkpnt_node.firstChild.nodeType == checkpnt_node.firstChild.TEXT_NODE:
            config["checkpoint_dir"] = checkpnt_node.firstChild.data

        if not config:
            raise Exception("Invalid configuration received from Splunk.")

        # just some validation: make sure these keys are present (required)
        validate_conf(config, "name")
        validate_conf(config, "key_id")
        validate_conf(config, "secret_key")
        validate_conf(config, "checkpoint_dir")
    except Exception as e:
        raise Exception("Error getting Splunk configuration via STDIN: %s" % str(e))

    return config

def read_from_s3_uri(url):
    u = urllib.parse.urlparse(str(url))
    bucket = u.netloc
    obj = None
    subdir = None
    if u.path:
        obj = u.path[1:]  # trim the leading slash
        subdir = "/".join(obj.split("/")[:-1])
        if subdir:
            subdir += "/"
    logging.debug("Extracted from url=%s bucket=%s subdir=%s object=%s" % (url, bucket, subdir, obj))
    if not subdir:
        subdir = None
    if not obj:
        obj = None
    return (bucket, subdir, obj)

class HTTPResponseWrapper(object):
    def __init__(self, resp):
        self.resp = resp

def init_stream():
    sys.stdout.write("<stream>")

def fini_stream():
    sys.stdout.write("</stream>")

def send_data(source, buf):
    sys.stdout.write("<event unbroken=\"1\"><data>")
    sys.stdout.write(xml.sax.saxutils.escape(buf))
    sys.stdout.write("</data>\n<source>")
    sys.stdout.write(xml.sax.saxutils.escape(source))
    sys.stdout.write("</source></event>\n")

def send_done_key(source):
    sys.stdout.write("<event unbroken=\"1\"><source>")
    sys.stdout.write(xml.sax.saxutils.escape(source))
    sys.stdout.write("</source><done/></event>\n")

# returns a list of all objects from a bucket
def get_objs_from_bucket(key_id, secret_key, bucket, subdir = None):
    query_string = None
    if subdir:
        query_string = "?prefix=%s&delimiter=/" % urllib.parse.quote(subdir)
    conn = get_http_connection(key_id, secret_key, bucket, obj = None, query_string = query_string)
    resp = conn.getresponse()
    log_response(resp)
    if resp.status != 200:
        raise Exception("AWS HTTP request return status code %d (%s): %s" % \
            (resp.status, resp.reason, get_amazon_error(resp.read())))
    bucket_listing = resp.read()
    conn.close()

    # parse AWS's bucket listing response
    objs = []
    doc = xml.dom.minidom.parseString(bucket_listing)
    root = doc.documentElement

    key_nodes = root.getElementsByTagName("Key")
    for key in key_nodes:
        if key.firstChild.nodeType == key.firstChild.TEXT_NODE:
            objs.append(key.firstChild.data)

    return objs

def get_encoded_file_path(config, url):
    # encode the URL (simply to make the file name recognizable)
    name = ""
    for i in range(len(url)):
        if url[i].isalnum():
            name += url[i]
        else:
            name += "_"

    # MD5 the URL
    m = md5.new()
    m.update(url)
    name += "_" + m.hexdigest()

    return os.path.join(config["checkpoint_dir"], name)

# returns true if the checkpoint file exists
def load_checkpoint(config, url):
    chk_file = get_encoded_file_path(config, url)
    # try to open this file
    try:
        open(chk_file, "r").close()
    except:
        # assume that this means the checkpoint it not there
        return False
    return True

# simply creates a checkpoint file indicating that the URL was checkpointed
def save_checkpoint(config, url):
    chk_file = get_encoded_file_path(config, url)
    # just create an empty file name
    logging.info("Checkpointing url=%s file=%s", url, chk_file)
    f = open(chk_file, "w")
    f.close()

def run():
    config = get_config()
    url = config["name"]
    bucket, subdir, obj = read_from_s3_uri(url)
    key_id = config["key_id"]
    secret_key = config["secret_key"]

    if obj and (not subdir or obj != subdir):
        # object-level URL provided (e.g. s3://bucket/object.txt) that does
        # not appear to be a directory (no ending slash)
        if not load_checkpoint(config, url):
            # there is no checkpoint for this URL: process
            init_stream()
            request_one_object(url, key_id, secret_key, bucket, obj)
            fini_stream()
            save_checkpoint(config, url)
        else:
            logging.info("URL %s already processed.  Skipping.")
    else:
        # bucket-level URL provided (e.g. s3://bucket), or a directory-level
        # URL (e.g. s3://bucket/some/subdir/)
        init_stream()
        while True:
            logging.debug("Checking for objects in bucket %s" % bucket)
            objs = get_objs_from_bucket(key_id, secret_key, bucket, subdir)
            for o in objs:
                if subdir and not o.startswith(subdir):
                    logging.debug("obj=%s does not start with %s.  Skipping.", subdir)
                    continue
                obj_url = "s3://" + bucket + "/" + o
                if not load_checkpoint(config, obj_url):
                    logging.info("Processing %s" % obj_url)
                    request_one_object(obj_url, key_id, secret_key, bucket, o)
                    save_checkpoint(config, obj_url)

            # check every 60 seconds for new entries
            time.sleep(60)
        fini_stream()

def request_one_object(url, key_id, secret_key, bucket, obj):
    assert bucket and obj

    conn = get_http_connection(key_id, secret_key, bucket, obj)
    resp = conn.getresponse()
    log_response(resp)
    if resp.status != 200:
        raise Exception("Amazon HTTP request to '%s' returned status code %d (%s): %s" % \
            (url, resp.status, resp.reason, get_amazon_error(resp.read())))

    translator = get_data_translator(url, resp)

    cur_src = ""
    buf = translator.read()
    bytes_read = len(buf)
    while len(buf) > 0:
        if cur_src and translator.source() != cur_src:
            send_done_key(cur_src)
        cur_src = translator.source()
        send_data(translator.source(), buf)
        buf = translator.read()
        bytes_read += len(buf)

    if cur_src:
        send_done_key(cur_src)

    translator.close()
    conn.close()
    sys.stdout.flush()

    logging.info("Done reading. Read bytes=%d", bytes_read)

# Handles file reading from tar archives.  From the tarfile module:
# fileobj must support: read(), readline(), readlines(), seek() and tell().
class TarTranslator(object):
    def __init__(self, src, tar):
        self.tar = tar
        self.member = next(self.tar)
        self.member_f = self.tar.extractfile(self.member)
        self.translator = None
        self.base_source = src
        if self.member:
            self.src = self.base_source + ":" + self.member.name
            if self.member_f:
                self.translator = get_data_translator(self.src, self.member_f)

    def read(self, sz = 8192):
        while True:
            while self.member and self.member_f is None:
                self.member = next(self.tar)
                if self.member:
                    self.member_f = self.tar.extractfile(self.member)
                    self.src = self.base_source + ":" + self.member.name
                    self.translator = get_data_translator(self.src, self.member_f)

            if not self.member:
                return "" # done

            buf = self.translator.read(sz)
            if len(buf) > 0:
                return buf
            self.member_f = None
            self.translator = None

    def close(self):
        self.tar.close()

    def source(self):
        return self.src

class FileObjTranslator(object):
    def __init__(self, src, fileobj):
        self.src = src
        self.fileobj = fileobj

    def read(self, sz = 8192):
        return self.fileobj.read(sz)

    def close(self):
        return self.fileobj.close()

    def source(self):
        return self.src

class GzipFileTranslator(object):
    def __init__(self, src, fileobj):
        self.src = src
        self.fileobj = fileobj

    def read(self, sz = 8192):
        return self.fileobj.read(sz)

    def close(self):
        return self.fileobj.close()

    def source(self):
        return self.src

def get_data_translator(url, fileobj):
    if url.endswith(".tar"):
        return TarTranslator(url, tarfile.open(None, "r|", fileobj))
    elif url.endswith(".tar.gz") or url.endswith(".tgz"):
        return TarTranslator(url, tarfile.open(None, "r|gz", fileobj))
    elif url.endswith(".tar.bz2"):
        return TarTranslator(url, tarfile.open(None, "r|bz2", fileobj))
    elif url.endswith(".gz"):
        # it's lame that gzip.GzipFile requires tell() and seek(), and our
        # "fileobj" does not supply these; wrap this with the object that is
        # used by the tarfile module
        return GzipFileTranslator(url, tarfile._Stream("", "r", "gz", fileobj, tarfile.RECORDSIZE))
    else:
        return FileObjTranslator(url, fileobj)

def do_scheme():
    print(SCHEME)

def get_validation_data():
    val_data = {}

    # read everything from stdin
    val_str = sys.stdin.read()

    # parse the validation XML
    doc = xml.dom.minidom.parseString(val_str)
    root = doc.documentElement

    logging.debug("XML: found items")
    item_node = root.getElementsByTagName("item")[0]
    if item_node:
        logging.debug("XML: found item")

        name = item_node.getAttribute("name")
        val_data["stanza"] = name

        params_node = item_node.getElementsByTagName("param")
        for param in params_node:
            name = param.getAttribute("name")
            logging.debug("Found param %s" % name)
            if name and param.firstChild and \
               param.firstChild.nodeType == param.firstChild.TEXT_NODE:
                val_data[name] = param.firstChild.data

    return val_data

# make sure that the amazon credentials are good
def validate_arguments():
    val_data = get_validation_data()
    key_id = val_data["key_id"]
    secret_key = val_data["secret_key"]

    try:
        url = "s3://" + val_data["stanza"]
        bucket, subdir, obj = read_from_s3_uri(url)
        logging.debug("('%s', '%s', '%s')" % (str(bucket), str(subdir), str(obj)))
        if subdir and subdir == obj:
            # monitoring a "sub-directory" within a bucket
            obj = None

            # see if there are any objects that would match that subdir
            all_objs = get_objs_from_bucket(key_id, secret_key, bucket, subdir)
            matches = False
            for o in all_objs:
                if o.startswith(subdir):
                    matches = True
                    break
            if not matches:
                raise Exception("No objects found inside s3://%s." % "/".join([bucket, subdir]))
        else:
            # use_bucket_as_host = False allows for better error checking:
            # AWS tends to return more helpfull error messages
            conn = get_http_connection(key_id, secret_key, bucket, obj, use_bucket_as_host = False)
            resp = conn.getresponse()
            log_response(resp)
            if old_div(resp.status, 100) == 3:
                # AWS may send a sometimes when it requires that the bucket
                # is part of the host: retry
                conn = get_http_connection(key_id, secret_key, bucket, obj, use_bucket_as_host = True)
                resp = conn.getresponse()
                log_response(resp)
            if resp.status != 200:
                raise Exception("Amazon returned HTTP status code %d (%s): %s" % (resp.status, resp.reason, get_amazon_error(resp.read())))

    except Exception as e:
        print_error("Invalid configuration specified: %s" % str(e))
        sys.exit(1)

def usage():
    print("usage: %s [--scheme|--validate-arguments]")
    sys.exit(2)

def test():
    init_stream()
    send_data("src1", "test 1")
    send_data("src2", "test 2")
    send_done_key("src2")
    send_data("src3", "test 3")

if __name__ == '__main__':
    if len(sys.argv) > 1:
        if sys.argv[1] == "--scheme":
            do_scheme()
        elif sys.argv[1] == "--validate-arguments":
            validate_arguments()
        elif sys.argv[1] == "--test":
            test()
        else:
            usage()
    else:
        # just request data from S3
        run()

    sys.exit(0)

S3 example spec file

Place the following spec file in the following location:

$SPLUNK_HOME/etc/apps/s3/README/inputs.conf.spec

inputs.conf.spec

[s3://<name>]

key_id = <value>
* This is Amazon key ID.

secret_key = <value>
* This is the secret key.
Last modified on 13 August, 2019
Developer tools for modular inputs   Setting up a scripted input

This documentation applies to the following versions of Splunk Cloud Platform: 8.2.2112, 8.2.2201, 8.2.2202, 8.2.2203, 9.0.2205, 9.0.2208, 9.0.2209, 9.0.2303, 9.0.2305, 9.1.2308, 9.1.2312, 9.2.2403, 9.2.2406 (latest FedRAMP release), 9.3.2408


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters