Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastic update for 7+ #1513

Merged
15 commits merged into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ matrix:
env: mode=debian
- python: 3.8
env: mode=codestyle
before_install:
- if [[ -v requirements ]]; then curl -s -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.1-amd64.deb; fi
- if [[ -v requirements ]]; then sudo dpkg -i --force-confnew elasticsearch-7.6.1-amd64.deb; fi
- if [[ -v requirements ]]; then sudo sed -i.old 's/-Xms1g/-Xms128m/' /etc/elasticsearch/jvm.options; fi
- if [[ -v requirements ]]; then sudo sed -i.old 's/-Xmx1g/-Xmx128m/' /etc/elasticsearch/jvm.options; fi
- if [[ -v requirements ]]; then echo -e '-XX:+DisableExplicitGC\n-Djdk.io.permissionsUseCanonicalPath=true\n-Dlog4j.skipJansi=true\n-server\n' | sudo tee -a /etc/elasticsearch/jvm.options; fi
- if [[ -v requirements ]]; then sudo chown -R elasticsearch:elasticsearch /etc/default/elasticsearch; fi
- if [[ -v requirements ]]; then sudo systemctl start elasticsearch; fi
install:
- set -e
- if [[ -v requirements ]]; then sudo apt-get install polipo lighttpd; fi
Expand Down Expand Up @@ -51,7 +59,6 @@ script:
services:
- redis-server
- postgresql
- elasticsearch
- mongodb
- rabbitmq
after_success:
Expand Down
14 changes: 6 additions & 8 deletions contrib/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pip3 install elasticsearch
```
usage: elasticmapper [-h] --harmonization-file <filepath>
[--harmonization-fallback] [--host <ip>] [--index INDEX]
[--index-type INDEX_TYPE] [--output <filepath>]
[--output <filepath>]

Elastic Mapper tool

Expand All @@ -30,8 +30,6 @@ optional arguments:
harmonization fallback to `text` type
--host <ip> elasticsearch server IP
--index INDEX elasticsearch index
--index-type INDEX_TYPE
elasticsearch index type
--index-template save the mapping as a template for newly-created indices
--output <filepath> write mapping to file
```
Expand All @@ -41,30 +39,30 @@ optional arguments:
#### Send only to Elasticsearch

```
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --index-type=events --host=127.0.0.1
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --host=127.0.0.1
```

#### Write only to output file

```
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --index-type=events --output=/tmp/mapping.txt
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --output=/tmp/mapping.txt
```

#### Send to Elasticsearch and write to output file
```
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --index-type=events --output=/tmp/mapping.txt --host=127.0.0.1
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --output=/tmp/mapping.txt --host=127.0.0.1
```

#### Send to Elasticsearch as a template (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html)

```
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --index-type=events --host=127.0.0.1 --index-template
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --host=127.0.0.1 --index-template
```

#### Harmonization fallback

Revert to the default 'text' type in the generated mapping for any fields which have unrecognizable field types.

```
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --index-type=events --output=/tmp/mapping.txt --host=127.0.0.1 --harmonization-fallback
elasticmapper --harmonization-file=intelmq/intelmq/etc/harmonization.conf --index=intelmq --output=/tmp/mapping.txt --host=127.0.0.1 --harmonization-fallback
```
16 changes: 3 additions & 13 deletions contrib/elasticsearch/elasticmapper
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,15 @@ def mapping_properties_from_harmonization(harmonization, replacement_char):
return __mapping_properties_from_harmonization(properties), err


def create_mapping(harmonization, index_type, replacement_char):
def create_mapping(harmonization, replacement_char):

config = {"enabled": False}

properties, err = mapping_properties_from_harmonization(harmonization, replacement_char)

data = {
"mappings": {
index_type: {
"_all": config,
"properties": properties
}
"properties": properties
}
}

Expand Down Expand Up @@ -176,13 +173,6 @@ if __name__ == "__main__":
required=False,
help='elasticsearch index name, or template name if using a template')

parser.add_argument('--index-type',
action="store",
dest="index_type",
default="events",
required=False,
help='elasticsearch index type')

parser.add_argument('--index-template',
action="store_true",
dest="index_template",
Expand All @@ -209,7 +199,7 @@ if __name__ == "__main__":
with open(arguments.harmonization_file) as fp:
harmonization = json.load(fp)

data, err = create_mapping(harmonization, arguments.index_type, arguments.replacement_char)
data, err = create_mapping(harmonization, arguments.replacement_char)

if err:
if arguments.harmonization_fallback:
Expand Down
1 change: 0 additions & 1 deletion docs/Bots.md
Original file line number Diff line number Diff line change
Expand Up @@ -2510,7 +2510,6 @@ This output bot discards all incoming messages.
'weekly' --> intelmq-2018-42
'monthly' --> intelmq-2018-02
'yearly' --> intelmq-2018
* `elastic_doctype`: Elasticsearch document type for the event. Default: events
* `http_username`: HTTP basic authentication username
* `http_password`: HTTP basic authentication password
* `use_ssl`: Whether to use SSL/TLS when connecting to Elasticsearch. Default: False
Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/outputs/elasticsearch/REQUIREMENTS.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
elasticsearch>=5.0.0,<6.0.0
elasticsearch>=7.0.0,<8.0.0
3 changes: 0 additions & 3 deletions intelmq/bots/outputs/elasticsearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def init(self):
'ssl_ca_certificate', None)
self.ssl_show_warnings = getattr(self.parameters,
'ssl_show_warnings', True)
self.elastic_doctype = getattr(self.parameters,
'elastic_doctype', 'events')
self.replacement_char = getattr(self.parameters,
'replacement_char', None)
self.flatten_fields = getattr(self.parameters,
Expand Down Expand Up @@ -126,7 +124,6 @@ def process(self):
replacement=self.replacement_char)

self.es.index(index=self.get_index(event_dict, default_date=datetime.today().date()),
doc_type=self.elastic_doctype,
body=event_dict)
self.acknowledge_message()

Expand Down
92 changes: 40 additions & 52 deletions intelmq/tests/bots/outputs/elasticsearch/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,28 @@
if os.environ.get('INTELMQ_TEST_DATABASES'):
import elasticsearch

INPUT1 = {"__type": "Event",
"classification.type": "infected-system",
"source.asn": 64496,
"source.ip": "192.0.2.1",
"feed.name": "Example Feed",
"extra": '{"foo.bar": "test"}'
}
OUTPUT1 = {'classification.type': 'infected-system',
'extra.foo.bar': 'test',
'feed.name': 'Example Feed',
'source.asn': 64496,
'source.ip': '192.0.2.1',
}
INPUT1 = {
"__type": "Event",
"classification.type": "infected-system",
"source.asn": 64496,
"source.ip": "192.0.2.1",
"feed.name": "Example Feed",
"extra": '{"foo.bar": "test"}'
}
OUTPUT1 = {
'classification.type': 'infected-system',
'extra.foo.bar': 'test',
'feed.name': 'Example Feed',
'source.asn': 64496,
'source.ip': '192.0.2.1',
}
OUTPUT1_REPLACEMENT_CHARS = {
'classification_type': 'infected-system',
'extra_foo_bar': 'test',
'feed_name': 'Example Feed',
'source_asn': 64496,
'source_ip': '192.0.2.1',
}
}
ES_SEARCH = {
"query": {
"constant_score": {
Expand All @@ -56,26 +58,24 @@

SAMPLE_TEMPLATE = {
"mappings": {
"events": {
"properties": {
"time.observation": {
"type": "date"
},
"time.source": {
"type": "date"
},
"classification.type": {
"type": "keyword"
},
"source.asn": {
"type": "integer"
},
"feed.name": {
"type": "text"
},
"source.ip": {
"type": "ip"
}
"properties": {
"time.observation": {
"type": "date"
},
"time.source": {
"type": "date"
},
"classification.type": {
"type": "keyword"
},
"source.asn": {
"type": "integer"
},
"feed.name": {
"type": "text"
},
"source.ip": {
"type": "ip"
}
}
},
Expand Down Expand Up @@ -132,19 +132,11 @@ def test_event(self):
self.run_bot()
time.sleep(1) # ES needs some time between inserting and searching
result = self.con.search(index='intelmq', body=ES_SEARCH)['hits']['hits'][0]
self.con.delete(index='intelmq', doc_type='events', id=result['_id'])
self.con.delete(index='intelmq',
# doc_type='events',
id=result['_id'])
self.assertDictEqual(OUTPUT1, result['_source'])

def test_raise_when_no_template(self):
"""
Test that a bot raises a RuntimeError if 'rotate_index' is set, but a matching template doesn't exist in ES.
"""
self.sysconfig = {"flatten_fields": "extra",
"elastic_index": "intelmq",
"elastic_doctype": "events",
"rotate_index": "daily"}
self.assertRaises(RuntimeError, self.run_bot)

def test_get_event_date(self):
"""
Test whether get_event_date detects the time.source and time.observation fields in an event.
Expand All @@ -160,7 +152,6 @@ def test_replacement_characters(self):
"""
self.sysconfig = {"flatten_fields": "extra",
"elastic_index": "intelmq",
"elastic_doctype": "events",
"replacement_char": "_",
"rotate_index": "never"}
self.run_bot()
Expand All @@ -169,7 +160,6 @@ def test_replacement_characters(self):
body=ES_SEARCH_REPLACEMENT_CHARS)['hits']['hits'][0]

self.con.delete(index=self.sysconfig.get('elastic_index'),
doc_type=self.sysconfig.get('elastic_doctype'),
id=result['_id'])

self.assertDictEqual(OUTPUT1_REPLACEMENT_CHARS, result['_source'])
Expand All @@ -180,7 +170,6 @@ def test_index_detected_from_time_source(self):
"""
self.sysconfig = {"flatten_fields": "extra",
"elastic_index": "intelmq",
"elastic_doctype": "events",
"rotate_index": "daily"}
expected_index_name = "{}-1869-12-02".format(self.sysconfig.get('elastic_index'))
self.base_check_expected_index_created(INPUT_TIME_SOURCE, expected_index_name)
Expand All @@ -192,7 +181,6 @@ def test_index_detected_from_time_observation(self):
"""
self.sysconfig = {"flatten_fields": "extra",
"elastic_index": "intelmq",
"elastic_doctype": "events",
"rotate_index": "daily"}
expected_index_name = "{}-2020-02-02".format(self.sysconfig.get('elastic_index'))
self.base_check_expected_index_created(INPUT_TIME_OBSERVATION, expected_index_name)
Expand All @@ -206,13 +194,13 @@ def test_index_falls_back_to_default_date(self):

self.sysconfig = {"flatten_fields": "extra",
"elastic_index": "intelmq",
"elastic_doctype": "events",
"rotate_index": "daily"}

class FakeDateTime(datetime):
"""
Passed to bot to force expected datetime value for test.
"""

@classmethod
def today(cls):
return datetime.strptime('2018-09-09T01:23:45+00:00', '%Y-%m-%dT%H:%M:%S+00:00')
Expand All @@ -232,7 +220,6 @@ def test_index_falls_back_to_default_string(self):

self.sysconfig = {"flatten_fields": "extra",
"elastic_index": "intelmq",
"elastic_doctype": "events",
"rotate_index": "daily"}

self.prepare_bot()
Expand All @@ -257,7 +244,8 @@ def base_check_expected_index_created(self, input_event, expected_index_name):
result_index_name = result["_index"]

# Clean up test event and check that the index name was set correctly
self.con.delete(index=result_index_name, doc_type=self.sysconfig.get('elastic_doctype'), id=result['_id'])
self.con.delete(index=result_index_name,
id=result['_id'])
self.assertEqual(result_index_name, expected_index_name)


Expand Down