|
| 1 | +import datetime |
1 | 2 | from .dojo_test_case import DojoTestCase
|
2 | 3 |
|
3 | 4 | from dojo.endpoint.utils import endpoint_get_or_create
|
4 |
| -from dojo.models import Endpoint |
| 5 | +from dojo.models import Product_Type, Product, Engagement, Test, Finding, Endpoint, Endpoint_Status |
5 | 6 | from django.core.exceptions import ValidationError
|
| 7 | +from django.apps import apps |
| 8 | +from django.utils import timezone |
| 9 | +from dojo.endpoint.utils import remove_broken_endpoint_statuses |
6 | 10 |
|
7 | 11 |
|
8 | 12 | class TestEndpointModel(DojoTestCase):
|
@@ -149,3 +153,95 @@ def test_get_or_create(self):
|
149 | 153 | port=8443
|
150 | 154 | )
|
151 | 155 | self.assertTrue(created7)
|
| 156 | + |
| 157 | + |
| 158 | +# TODO: These tests can be skipped in the future (when Endpoint_Status.{finding,endpoint}(null=False,blank=False)) |
| 159 | +# @skip("Outdated - this class was testing clean-up broken entries in old version of model; new version of model doesn't to store broken entries") |
| 160 | +class TestEndpointStatusBrokenModel(DojoTestCase): |
| 161 | + |
| 162 | + def test_endpoint_status_broken(self): |
| 163 | + |
| 164 | + self.prod_type = Product_Type.objects.create() |
| 165 | + self.product = Product.objects.create(prod_type=self.prod_type) |
| 166 | + self.engagement = Engagement.objects.create( |
| 167 | + product=self.product, |
| 168 | + target_start=datetime.datetime(2020, 1, 1, tzinfo=timezone.utc), |
| 169 | + target_end=datetime.datetime(2022, 1, 1, tzinfo=timezone.utc) |
| 170 | + ) |
| 171 | + self.test = Test.objects.create( |
| 172 | + engagement=self.engagement, |
| 173 | + target_start=datetime.datetime(2020, 1, 1, tzinfo=timezone.utc), |
| 174 | + target_end=datetime.datetime(2022, 1, 1, tzinfo=timezone.utc), |
| 175 | + test_type_id=1 |
| 176 | + ) |
| 177 | + from django.contrib.auth import get_user_model |
| 178 | + user = get_user_model().objects.create().pk |
| 179 | + self.finding = Finding.objects.create(test=self.test, reporter_id=user).pk |
| 180 | + self.endpoint = Endpoint.objects.create(protocol='http', host='foo.bar.eps').pk |
| 181 | + self.another_finding = Finding.objects.create(test=self.test, reporter_id=user).pk |
| 182 | + self.another_endpoint = Endpoint.objects.create(protocol='http', host='bar.foo.eps').pk |
| 183 | + self.endpoint_status = { |
| 184 | + 'standard': Endpoint_Status.objects.create( |
| 185 | + date=datetime.datetime(2021, 3, 1, tzinfo=timezone.utc), |
| 186 | + last_modified=datetime.datetime(2021, 4, 1, tzinfo=timezone.utc), |
| 187 | + mitigated=False, |
| 188 | + finding_id=self.finding, |
| 189 | + endpoint_id=self.endpoint |
| 190 | + ).pk, |
| 191 | + 'removed_endpoint': Endpoint_Status.objects.create( |
| 192 | + date=datetime.datetime(2021, 2, 1, tzinfo=timezone.utc), |
| 193 | + last_modified=datetime.datetime(2021, 5, 1, tzinfo=timezone.utc), |
| 194 | + mitigated=True, |
| 195 | + finding_id=self.another_finding, |
| 196 | + endpoint_id=None |
| 197 | + ).pk, |
| 198 | + 'removed_finding': Endpoint_Status.objects.create( |
| 199 | + date=datetime.datetime(2021, 2, 1, tzinfo=timezone.utc), |
| 200 | + last_modified=datetime.datetime(2021, 5, 1, tzinfo=timezone.utc), |
| 201 | + mitigated=True, |
| 202 | + finding_id=None, |
| 203 | + endpoint_id=self.another_endpoint |
| 204 | + ).pk, |
| 205 | + } |
| 206 | + |
| 207 | + Finding.objects.get(id=self.finding).endpoint_status.add( |
| 208 | + Endpoint_Status.objects.get(id=self.endpoint_status['standard']) |
| 209 | + ) |
| 210 | + Finding.objects.get(id=self.another_finding).endpoint_status.add( |
| 211 | + Endpoint_Status.objects.get(id=self.endpoint_status['removed_endpoint']) |
| 212 | + ) |
| 213 | + |
| 214 | + Endpoint.objects.get(id=self.endpoint).endpoint_status.add( |
| 215 | + Endpoint_Status.objects.get(id=self.endpoint_status['standard']) |
| 216 | + ) |
| 217 | + Endpoint.objects.get(id=self.another_endpoint).endpoint_status.add( |
| 218 | + Endpoint_Status.objects.get(id=self.endpoint_status['removed_finding']) |
| 219 | + ) |
| 220 | + |
| 221 | + remove_broken_endpoint_statuses(apps) |
| 222 | + |
| 223 | + with self.subTest('Stadnard eps for finding'): |
| 224 | + f = Finding.objects.filter(id=self.finding) |
| 225 | + self.assertEqual(f.count(), 1) |
| 226 | + f = f.first() |
| 227 | + self.assertEqual(f.endpoint_status.count(), 1) |
| 228 | + self.assertEqual(f.endpoint_status.first().pk, self.endpoint_status['standard']) |
| 229 | + |
| 230 | + with self.subTest('Broken eps for finding'): |
| 231 | + f = Finding.objects.filter(id=self.another_finding) |
| 232 | + self.assertEqual(f.count(), 1) |
| 233 | + f = f.first() |
| 234 | + self.assertEqual(f.endpoint_status.count(), 0) |
| 235 | + |
| 236 | + with self.subTest('Stadnard eps for endpoint'): |
| 237 | + e = Endpoint.objects.filter(id=self.endpoint) |
| 238 | + self.assertEqual(e.count(), 1) |
| 239 | + e = e.first() |
| 240 | + self.assertEqual(e.endpoint_status.count(), 1) |
| 241 | + self.assertEqual(e.endpoint_status.first().pk, self.endpoint_status['standard']) |
| 242 | + |
| 243 | + with self.subTest('Broken eps for endpoint'): |
| 244 | + e = Endpoint.objects.filter(id=self.another_endpoint) |
| 245 | + self.assertEqual(e.count(), 1) |
| 246 | + e = e.first() |
| 247 | + self.assertEqual(e.endpoint_status.count(), 0) |
0 commit comments