import splunklib.client as clientimport splunklib.results as results# Splunk ์ธ์คํด์ค์ ์ฐ๊ฒฐservice = client.connect( host='YOUR_SPLUNK_HOST', port='YOUR_SPLUNK_PORT', username='YOUR_USERNAME', password='YOUR_PASSWORD')# Splunk์์ ๋ฐ์ดํฐ ๊ฒ์ ์คํsearchquery_normal = "search * | head 100"job = service.jobs.create(searchquery_normal)# ๊ฒฐ๊ณผ ๊ฐ์ ธ์ค๊ธฐresult_count = 50offset = 0results = []while True: # ๊ฒ์ ๊ฒฐ๊ณผ ์กฐ๊ฐ ๊ฐ์ ธ์ค๊ธฐ kwargs_paginate = {'count': result_count, 'offset': offset} rs = results.ResultsReader(job.results(**kwargs_paginate)) records = [record for record in rs] results.extend(records) if len(records) == 0: break offset += result_count# ๊ฒฐ๊ณผ ์ถ๋ ฅfor result in results: print(result)
Using export
import splunklib.client as clientimport splunklib.results as results# Splunk ์ธ์คํด์ค์ ์ฐ๊ฒฐservice = client.connect( host='YOUR_SPLUNK_HOST', port='YOUR_SPLUNK_PORT', username='YOUR_USERNAME', password='YOUR_PASSWORD')# Run an export search and display the results using the results reader.searchquery_export = "search index=_internal"kwargs_export = {"earliest_time": "-1h", "latest_time": "now", "search_mode": "normal", "output_mode": "json"}exportsearch_results = service.jobs.export(searchquery_export, **kwargs_export)# Get the results and display them using the JSONResultsReaderreader = results.JSONResultsReader(exportsearch_results)for result in reader: if isinstance(result, dict): print "Result: %s" % result elif isinstance(result, results.Message): # Diagnostic messages may be returned in the results print "Message: %s" % result# Print whether results are a preview from a running searchprint "is_preview = %s " % reader.is_preview
Exported events are somehow limited to 480K ~ 570K
from datetime import datetime, timedeltafrom pandas import date_rangeimport splunklib.client as clientimport splunklib.results as resultsimport jsonimport time# Splunk ์ธ์คํด์ค์ ์ฐ๊ฒฐservice = client.connect( host='YOUR_SPLUNK_HOST', port='YOUR_SPLUNK_PORT', username='YOUR_USERNAME', password='YOUR_PASSWORD')# Run an export search and display the results using the results reader.search_query = "search index=_internal"for date in date_range('2023-11-10', '2024-02-15'): end_time = date + timedelta(days=1) with gzip.open('logs.json.gz', 'wb') as gzip_file: while earliest_time < end_time: latest_time = earliest_time + timedelta(minutes=15) kwargs_export = {"earliest_time": earliest_time.strftime('%Y-%m-%dT%H:%M:%S.000+00:00'), "latest_time": latest_time.strftime('%Y-%m-%dT%H:%M:%S.000+00:00'), "search_mode": "normal", "output_mode": "json"} job = service.jobs.export(search_query, **kwargs_export) for result in results.JSONResultsReader(job): if isinstance(result, dict): f.write(json.dumps(result) + '\n') earliest_time = latest_time