Ошибка Альтаира при сохранении данных в дашборд

Всем привет у меня проблема с альтаиром и вегой при взаимодействии elasticsearch. В деталях произошло следующее: у меня есть шаблон индекса, сгенерированный данными размером 38,8 МБ в порядке 90 тыс. строк. Я хочу использовать этот шаблон индекса, чтобы создать визуализацию, которая позволит мне фильтровать на основе двух значений. Итак, идея заключается в следующем: я хочу представить данные во времени и вставить эту визуализацию в панель инструментов kibana. Я должен иметь возможность взаимодействовать с визуализацией и иметь возможность делать это на основе вставленного фильтра. Мне это удается, в деталях мне удается создать этот график с частью кода ниже: в деталях мы можем увидеть def saveVegaLiteVis и сохранить объект, взятый из ссылка, тогда мы увидим построенную диаграмму, которая работает так, как я хочу. Я хотел бы решить эту проблему, когда данные увеличиваются, потому что данные меньше 3 МБ, поэтому в порядке 10 КБ у меня нет проблем с визуализацией.

def saveVegaLiteVis(client, index, visName, altairChart, resultSize=100, timeField=True):
    chart_json = json.loads(altairChart.to_json())
    chart_json['data']['url'] = {
        "%context%": True,
        "index": index,
        "body": {
            "size": resultSize
        }
    }

    if timeField:
      chart_json['data']['url']['%timefield%'] = "timestamp"

    visState = {
      "type": "vega",
      "aggs": [],
      "params": {
        "spec": json.dumps(chart_json, sort_keys=True, indent=4, separators=(',', ': ')),
      },
      "title": visName
    }

    visSavedObject={
        "visualization" : {
          "title" : visName,
          "visState" : json.dumps(visState, sort_keys=True, indent=4, separators=(',', ': ')),
          "uiStateJSON" : "{}",
          "description" : "",
          "version" : 1,
          "kibanaSavedObjectMeta" : {
            "searchSourceJSON" : json.dumps({
              "query": {
                "language": "kuery",
                "query": ""
              },
              "filter": []
            }),
          }
        },
        "type" : "visualization",
        "references" : [ ],
        "migrationVersion" : {
          "visualization" : "7.7.0"
        },
        "updated_at" : datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
    }

    return client.index(index='.kibana',id='visualization:'+visName,body=visSavedObject)

def saveVegaVis(client, index, visName, altairChart, resultSize=100, timeField=True):
    chart_json = json.loads(altairChart.to_json())
    chart_json['spec']['data']['url'] = {
        "%context%": True,
        "index": index,
        "body": {
            "size": resultSize
        }
    }

    if timeField:
      chart_json['spec']['data']['url']['%timefield%'] = "timestamp"

    visState = {
      "type": "vega",
      "aggs": [],
      "params": {
        "spec": json.dumps(chart_json, sort_keys=True, indent=4, separators=(',', ': ')),
      },
      "title": visName
    }

    visSavedObject={
        "visualization" : {
          "title" : visName,
          "visState" : json.dumps(visState, sort_keys=True, indent=4, separators=(',', ': ')),
          "uiStateJSON" : "{}",
          "description" : "",
          "version" : 1,
          "kibanaSavedObjectMeta" : {
            "searchSourceJSON" : json.dumps({
              "query": {
                "language": "kuery",
                "query": ""
              },
              "filter": []
            }),
          }
        },
        "type" : "visualization",
        "references" : [ ],
        "migrationVersion" : {
          "visualization" : "7.7.0"
        },
        "updated_at" : datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
    }

    return client.index(index='.kibana',id='visualization:'+visName,body=visSavedObject)
def getDataBasedonIndex(index,idValue,es):
    es.indices.refresh(index=index)
    res = es.get(index=index, id=idValue)
    print(res['_source'])



input_dropdown = alt.binding_select(options=[[1,2,3,4,5],[4],[3],[1],[2],[5],[1,4,5],[3,4,5],[1,2,3,4],[1,3,5],[1,3,4],[1,2,5],[1,2,4],[1,2,3],[4,5],[4,2],[2,3],[1,2],[5,3],[3,4],[2,5],[1,4],[1,5],[1,2],[1,3]])
selection = alt.selection_single(fields=['NUMBER'], bind=input_dropdown, name='FIELD: ')

#dropdown
input_dropdown1 = alt.binding_select(options=[['M3','M4','M5','M6'],['M4'],['M3'],['M6'],['M5']])
selection1 = alt.selection_single(fields=['SHAPE TYPE'], bind=input_dropdown1, name='FIELD2: ')
#shape
selection_Operation= alt.selection_multi(fields=['NUMBER:N'],bind='legend')
shape_Operation = alt.condition(selection_Operation ,alt.Shape('NUMBER:N'), alt.value('lightgray'))

color = alt.condition(selection,alt.Color('SHAPE TYPE:N'), alt.value('lightgray'))

interaction1 = alt.selection_interval(bind='scales', on="[mousedown[event.altKey], mouseup] > mousemove", translate="[mousedown[event.altKey], mouseup] > mousemove!",
    zoom="wheel![event.altKey]"
)

interactionY = alt.selection_interval(bind='scales', encodings=['x'],on="[mousedown[event.shiftKey], mouseup] > mousemove",translate="[mousedown[event.shiftKey], mouseup] > mousemove!",
    zoom="wheel![event.shiftKey]")
ScatterLine=alt.Chart(df).mark_point(filled=True).encode(x=alt.X('@timestamp:T', title='TIMESTAMP'),y=alt.Y('value:Q', title='value'),  color=color,shape=shape_Operation,tooltip = ['value:N','NUMBER:N','SHAPE TYPE:N', alt.Tooltip('@timestamp:T', format = '%Y-%m-%d %H:%M'),'ID:N']
).add_selection(interaction1,interactionY,selection,selection1,selection_Operation).resolve_scale(  x='independent').transform_filter(selection & selection1)

ScatterLine 

saveVegaLiteVis(es, 'index-pattern1', 'RapresentationPoint', ScatterLine, timeField=True)

Как вы могли видеть ниже, что ошибка генерируется только подпрограммой saveVegaLiteVis. Я пытаюсь изменить saveVega, вставляя результатSize=1000000, но в то же время генерирую ошибку ниже. Как я могу это решить? Я хочу быть уверен, что проблема заключается только в том, чтобы сохранить это, чтобы вставить информацию в панель инструментов. Это означает, что сама визуализация работает.

---------------------------------------------------------------------------
timeout                                   Traceback (most recent call last)
/usr/lib/python3/dist-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    420                     # Otherwise it looks like a bug in the code.
--> 421                     six.raise_from(e, None)
    422         except (SocketTimeout, BaseSSLError, SocketError) as e:

/usr/lib/python3/dist-packages/six.py in raise_from(value, from_value)

/usr/lib/python3/dist-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    415                 try:
--> 416                     httplib_response = conn.getresponse()
    417                 except BaseException as e:

/usr/lib/python3.8/http/client.py in getresponse(self)
   1346             try:
-> 1347                 response.begin()
   1348             except ConnectionError:

/usr/lib/python3.8/http/client.py in begin(self)
    306         while True:
--> 307             version, status, reason = self._read_status()
    308             if status != CONTINUE:

/usr/lib/python3.8/http/client.py in _read_status(self)
    267     def _read_status(self):
--> 268         line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
    269         if len(line) > _MAXLINE:

/usr/lib/python3.8/socket.py in readinto(self, b)
    668             try:
--> 669                 return self._sock.recv_into(b)
    670             except timeout:

timeout: timed out

During handling of the above exception, another exception occurred:

ReadTimeoutError                          Traceback (most recent call last)
~/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py in perform_request(self, method, url, params, body, timeout, ignore, headers)
    250 
--> 251             response = self.pool.urlopen(
    252                 method, url, body, retries=Retry(False), headers=request_headers, **kw

/usr/lib/python3/dist-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    718 
--> 719             retries = retries.increment(
    720                 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]

/usr/lib/python3/dist-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
    375             # Disabled, indicate to re-raise the error.
--> 376             raise six.reraise(type(error), error, _stacktrace)
    377 

/usr/lib/python3/dist-packages/six.py in reraise(tp, value, tb)
    702                 raise value.with_traceback(tb)
--> 703             raise value
    704         finally:

/usr/lib/python3/dist-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    664             # Make the request on the httplib connection object.
--> 665             httplib_response = self._make_request(
    666                 conn,

/usr/lib/python3/dist-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    422         except (SocketTimeout, BaseSSLError, SocketError) as e:
--> 423             self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
    424             raise

/usr/lib/python3/dist-packages/urllib3/connectionpool.py in _raise_timeout(self, err, url, timeout_value)
    329         if isinstance(err, SocketTimeout):
--> 330             raise ReadTimeoutError(
    331                 self, url, "Read timed out. (read timeout=%s)" % timeout_value

ReadTimeoutError: HTTPConnectionPool(host='localhost', port=9200): Read timed out. (read timeout=10)

During handling of the above exception, another exception occurred:

ConnectionTimeout                         Traceback (most recent call last)
<ipython-input-31-45635f0d8635> in <module>
----> 1 saveVegaLiteVis(es, 'index-pattern', 'RapresentationPoint', ScatterLine, timeField=True,resultSize=100000)

<ipython-input-3-984bfed0a208> in saveVegaLiteVis(client, index, visName, altairChart, resultSize, timeField)
     46     }
     47 
---> 48     return client.index(index='.kibana',id='visualization:'+visName,body=visSavedObject)
     49 
     50 def saveVegaVis(client, index, visName, altairChart, resultSize=100, timeField=True):

~/.local/lib/python3.8/site-packages/elasticsearch/client/utils.py in _wrapped(*args, **kwargs)
    166                 if p in kwargs:
    167                     params[p] = kwargs.pop(p)
--> 168             return func(*args, params=params, headers=headers, **kwargs)
    169 
    170         return _wrapped

~/.local/lib/python3.8/site-packages/elasticsearch/client/__init__.py in index(self, index, body, doc_type, id, params, headers)
    404             doc_type = "_doc"
    405 
--> 406         return self.transport.perform_request(
    407             "POST" if id in SKIP_IN_PATH else "PUT",
    408             _make_path(index, doc_type, id),

~/.local/lib/python3.8/site-packages/elasticsearch/transport.py in perform_request(self, method, url, headers, params, body)
    413                         raise e
    414                 else:
--> 415                     raise e
    416 
    417             else:

~/.local/lib/python3.8/site-packages/elasticsearch/transport.py in perform_request(self, method, url, headers, params, body)
    379 
    380             try:
--> 381                 status, headers_response, data = connection.perform_request(
    382                     method,
    383                     url,

~/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py in perform_request(self, method, url, params, body, timeout, ignore, headers)
    261                 raise SSLError("N/A", str(e), e)
    262             if isinstance(e, ReadTimeoutError):
--> 263                 raise ConnectionTimeout("TIMEOUT", str(e), e)
    264             raise ConnectionError("N/A", str(e), e)
    265 

ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host='localhost', port=9200): Read timed out. (read timeout=10))

person Andrea Lombardo    schedule 04.06.2021    source источник
comment
Vega-Lite плохо подходит для отображения наборов данных с большим количеством строк. Обычно я вижу замедление при более чем 10 000 или 20 000 строк. Я подозреваю, что 1 000 000 строк просто перегружают систему. Попробуйте отфильтровать до нескольких тысяч результатов, чтобы увидеть, работает ли это; если да, то это подтвердит эту гипотезу.   -  person jakevdp    schedule 04.06.2021
comment
Спасибо за ответ @jakevdp! Странно то, что система может отображать все данные, но когда я пытаюсь сохранить их на панели инструментов, происходит сбой. Проблема похожа на связь, но я не знаю, как решить, потому что мне удается отображать данные, если я печатаю ScatterLine.   -  person Andrea Lombardo    schedule 04.06.2021


Ответы (1)


Мне удалось решить проблему, добавив в параметр es более высокое время запроса.

es = Elasticsearch([{'host': HOST_ADDRESS, 'port': THE_PORT}], timeout=30)

(эта информация взята по другой проблеме link

Чтобы избежать сбоя системы, мне также нужно увеличить дисковое пространство, необходимое для кибаны. Для этого я вставляю "--max-old.space-size=2048" в раздел среды на kibana.yml

person Andrea Lombardo    schedule 05.06.2021