Могу ли я избежать потокового UDP-сокета в Python, отбрасывающего данные?

Во-первых, я новичок в Python и учусь на работе, так что будьте осторожны!

Я пытаюсь написать потоковое приложение Python для Windows, которое считывает данные из сокета UDP (поток-1), записывает их в файл (поток-2) и отображает данные в реальном времени (поток-3) в виджет (gtk .Image с использованием gtk.gdk.pixbuf). Я использую очереди для передачи данных между потоками.

Моя проблема в том, что если я запускаю только потоки 1 и 3 (так что пока пропустите запись файла), мне кажется, что я потеряю некоторые данные после первых нескольких выборок. После этого капли выглядит нормально. Даже если позволить потоку 1 завершиться перед запуском потока 3, это очевидное падение все еще присутствует.

Приносим извинения за длину фрагмента кода (я удалил поток, который записывает в файл), но я чувствовал, что удаление кода просто вызовет вопросы. Надеюсь, кто-то сможет пролить свет :-)

import socket
import threading
import Queue
import numpy
import gtk
gtk.gdk.threads_init()
import gtk.glade
import pygtk


class readFromUDPSocket(threading.Thread):

    def __init__(self, socketUDP, readDataQueue, packetSize, numScans):
        threading.Thread.__init__(self)
        self.socketUDP = socketUDP
        self.readDataQueue = readDataQueue
        self.packetSize = packetSize
        self.numScans = numScans

    def run(self):
        for scan in range(1, self.numScans + 1):
            buffer = self.socketUDP.recv(self.packetSize)
            self.readDataQueue.put(buffer)
        self.socketUDP.close()
        print 'myServer finished!'


class displayWithGTK(threading.Thread):

    def __init__(self, displayDataQueue, image, viewArea):
        threading.Thread.__init__(self)
        self.displayDataQueue = displayDataQueue
        self.image = image
        self.viewWidth = viewArea[0]
        self.viewHeight = viewArea[1]
        self.displayData = numpy.zeros((self.viewHeight, self.viewWidth, 3), dtype=numpy.uint16)

    def run(self):
        scan = 0
        try:
            while True:
                if not scan % self.viewWidth: scan = 0
                buffer = self.displayDataQueue.get(timeout=0.1)
                self.displayData[:, scan, 0] = numpy.fromstring(buffer, dtype=numpy.uint16)
                self.displayData[:, scan, 1] = numpy.fromstring(buffer, dtype=numpy.uint16)
                self.displayData[:, scan, 2] = numpy.fromstring(buffer, dtype=numpy.uint16)
                gtk.gdk.threads_enter()
                self.myPixbuf = gtk.gdk.pixbuf_new_from_data(self.displayData.tostring(), gtk.gdk.COLORSPACE_RGB,
                                                        False, 8, self.viewWidth, self.viewHeight, self.viewWidth * 3)
                self.image.set_from_pixbuf(self.myPixbuf)
                self.image.show()
                gtk.gdk.threads_leave()
                scan += 1
        except Queue.Empty:
            print 'myDisplay finished!'
            pass


def quitGUI(obj):
    print 'Currently active threads: %s' % threading.enumerate()
    gtk.main_quit()


if __name__ == '__main__':

    # Create socket (IPv4 protocol, datagram (UDP)) and bind to address
    socketUDP = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    host = '192.168.1.5'
    port = 1024
    socketUDP.bind((host, port))

    # Data parameters
    samplesPerScan = 256
    packetsPerSecond = 1200
    packetSize = 512
    duration = 1  # For now, set a fixed duration to log data
    numScans = int(packetsPerSecond * duration)

    # Create array to store data
    data = numpy.zeros((samplesPerScan, numScans), dtype=numpy.uint16)

    # Create queue for displaying from
    readDataQueue = Queue.Queue(numScans)

    # Build GUI from Glade XML file
    builder = gtk.Builder()
    builder.add_from_file('GroundVue.glade')
    window = builder.get_object('mainwindow')
    window.connect('destroy', quitGUI)
    view = builder.get_object('viewport')
    image = gtk.Image()
    view.add(image)
    viewArea = (1200, samplesPerScan)

    # Instantiate & start threads
    myServer = readFromUDPSocket(socketUDP, readDataQueue, packetSize, numScans)
    myDisplay = displayWithGTK(readDataQueue, image, viewArea)

    myServer.start()
    myDisplay.start()

    gtk.gdk.threads_enter()
    gtk.main()
    gtk.gdk.threads_leave()
    print 'gtk.main finished!'


person 666craig    schedule 16.03.2010    source источник


Ответы (5)


UDP не проверяет, что цель получила его (как это делает TCP) - вы должны реализовать повторную передачу и тому подобное в своих приложениях, если вы хотите гарантировать получение всех данных. Вы контролируете источник отправки UDP?

person lunixbochs    schedule 16.03.2010
comment
У меня нет контроля над источником, он просто отправляет дейтаграммы (размером 512 байт) со скоростью 1200 пакетов в секунду. - person 666craig; 17.03.2010
comment
обязательно ли получать все данные? какие данные вы получаете? - person lunixbochs; 17.03.2010

Протокол UDP по определению ненадежен. Вы не должны писать программы, которые ожидают, что дейтаграммы UDP будут всегда проходить.

Пакеты также постоянно отбрасываются в TCP, но вашей программе это не нужно, потому что приложения TCP не могут обрабатывать пакеты; стек TCP показывает вашему приложению поток байтов. Там есть много механизмов, чтобы гарантировать, что если вы отправите байты 'ABCD', вы увидите на конце 'A' 'B' 'C' 'D'. Конечно, вы можете получить любую возможную коллекцию пакетов: «ABC», «D» или «AB», CD и т. Д. Или вы можете просто увидеть «ABC», а затем ничего.

TCP не является «надежным», потому что он волшебным образом может заставить ваши сетевые кабели никогда не выходить из строя или ломаться; гарантия, которую он предоставляет, заключается в том, что до момента, когда поток прерывается, вы будете видеть все в порядке. И после того, как поток прервется, вы ничего не увидите.

В UDP такой гарантии нет. Если вы отправляете четыре дейтаграммы UDP, «AB», «CD», «EF», «GH», вы можете получить их все, или ни одного, или половину из них, или только одну из них. Вы можете получить их в любом порядке. Единственная гарантия, которую пытается предоставить UDP, - это то, что вы не увидите в нем сообщения «ABCD», потому что эти байты находятся в разных дейтаграммах.

Подводя итог: это не имеет ничего общего с Python, потоками или GTK. Это просто базовый факт жизни в сетях, основанный на физической реальности: иногда электрические характеристики ваших проводов не способствуют передаче ваших сообщений по ним.

Вы можете уменьшить сложность своей программы, используя Twisted, в частности listenUDP API, потому что тогда вам не нужно будет жонглировать потоки или их взаимодействие с GTK: вы можете просто вызывать методы прямо в рассматриваемом виджете из вашего datagramReceived метода. Но это не решит вашу основную проблему: UDP просто иногда сбрасывает данные, точка. Реальное решение - убедить ваш источник данных использовать вместо этого TCP.

person Glyph    schedule 26.03.2010
comment
@Lefkowitz, как правило, полезно оставить комментарий о том, почему вы так старались пометить ответ. - person MattH; 27.03.2010

Во-первых; вы можете установить размер буфера recv для сокета? Если это так, установите для него что-то очень большое, так как это позволит стеку UDP буферизовать для вас больше дейтаграмм.

Во-вторых; если вы можете использовать асинхронный ввод-вывод, тогда опубликуйте сразу несколько вызовов recv (опять же, это позволяет стеку обслуживать больше дейтаграмм, прежде чем он начнет их отбрасывать).

В-третьих; вы можете попробовать немного развернуть цикл и прочитать несколько датаграмм, прежде чем помещать их в свою очередь; может ли блокировка очереди вызывать медленную работу потока recv ??

Ну наконец то; дейтаграммы могут быть сброшены где-нибудь в сети, возможно, вы ничего не можете сделать, что U в UDP ...

person Len Holgate    schedule 17.03.2010
comment
Изменение размера буфера для сокета не имело никакого эффекта, но я попробую использовать несколько вызовов recv в цикле. - person 666craig; 17.03.2010
comment
Насколько быстро вы можете обрабатывать датаграммы, можете ли вы обрабатывать их быстрее, чем они приходят? В противном случае стек БУДЕТ отбрасывать дейтаграммы, независимо от того, насколько велики буферы. Что показывает wirehark при запуске с компьютера, на котором запущен код Python? Все ли ожидаемые датаграммы попадают на эту машину? Они могут быть сброшены в сети. Что показывает wirehark, если он запущен на машине, генерирующей дейтаграммы, он может думать, что генерирует 1200 / сек, но исходящий стек может отбрасывать некоторые, и они могут никогда не увидеть провод ... - person Len Holgate; 17.03.2010

Изменить - вычеркнуто предложение "прослушать / принять", спасибо, Даниэль, я как раз собирался удалить его, когда увидел ваш комментарий :)

Я бы предположил, что это проблема сетевого программирования, а не Python как такового.

Вы установили частоту пакетов в секунду и продолжительность, чтобы определить количество вызовов recv, которые вы делаете в свой сокет UDP. Я не вижу listen или accept вызова сокета, я предполагаю, что recv обрабатывает это, поскольку вы говорите, что вы получаете некоторые данные. Вы не упомянули генерацию данных.

Вы определили, сколько чтений вы ожидаете сделать, поэтому я предполагаю, что код делает это много получает перед выходом, поэтому я могу сделать вывод, что вашего recv packetSize недостаточно, и, следовательно, одно чтение не тянет весь дейтаграммы, то последующий recv вытягивает следующую часть предыдущей дейтаграммы.

Разве вы не можете посмотреть на полученные данные и определить, чего не хватает? Какие данные вы «теряете»? Как узнать, что оно потеряно?

Кроме того, вы можете использовать wireshark, чтобы проверить, действительно ли ваш хост получает данные, одновременно с проверкой размера дейтаграмм. Сопоставьте захват с данными, которые предоставляет ваш поток recv.


Обновить

Вы говорите, что теряете данные, но не то, что они есть. Я вижу две возможности потери данных:

  • Усечение пакетов
  • Отбрасывание пакетов

Вы сказали, что размер полезной нагрузки такой же, как и размер, который вы передаете в recv, поэтому я буду считать, что вы не усекаете.

Таким образом, факторы для отбрасывания пакетов - это комбинация скорости получения, скорости чтения из буфера приема и размера буфера приема.

Ваши звонки на Queue.put могут снижать скорость чтения.

Итак, сначала определите, что вы можете читать 1200 пакетов в секунду, изменив readFromUDPSocket на не Queue.put, но посчитайте количество полученных пакетов и сообщите о затраченном времени.

После того, как вы определили, что можете позвонить recv достаточно быстро, следующим шагом будет определение того, что вас замедляет. Я подозреваю, что это может быть ваше использование Queue, я предлагаю группировать полезные данные в группах размера N для размещения на Queue, чтобы вы не пытались вызвать put на частоте 12 Гц.

Поскольку вы хотите поддерживать скорость 1200 операций чтения в секунду, я не думаю, что вы далеко продвинетесь, увеличив буфер приема на сокете.

person MattH    schedule 16.03.2010
comment
Спасибо. Я действительно использую Wireshark (отлично подходит для диагностики сети), и он сообщает размер данных как 512 байт, что является размером пакета, который я использую. - person 666craig; 17.03.2010

Похоже, проблема в источнике. Есть две проблемы:

  1. Если посмотреть на Wireshark, источник не передает последовательно 1200 пакетов в секунду. Возможно, как указал Лен, проблема с отбрасыванием исходящих данных из стека. Кстати, источник - это программируемая карта с портом Ethernet, подключенным к моей машине.

  2. Другая проблема заключается в том, что после первых 15 пакетов данных всегда происходит отбрасывание. Я обнаружил, что если я получу 20 пакетов в части инициализации потока readFromUDPSocket, я смогу нормально прочитать данные, например

class readFromUDPSocket(threading.Thread):

    def __init__(self, socketUDP, readDataQueue, packetSize, numScans):
        threading.Thread.__init__(self)
        self.socketUDP = socketUDP
        self.readDataQueue = readDataQueue
        self.packetSize = packetSize
        self.numScans = numScans
        for i in range(0, 20):
            buffer = self.socketUDP.recv(self.packetSize)

    def run(self):
        for scan in range(1, self.numScans + 1):
            buffer = self.socketUDP.recv(self.packetSize)
            self.readDataQueue.put(buffer)
        self.socketUDP.close()
        print 'myServer finished!'

Не уверен, на что это указывает ?! Я думаю, что все это исключает невозможность быстрого восстановления и тренировок.

person 666craig    schedule 17.03.2010
comment
IMHO, вы должны относиться к этому как к шагу оптимизации и не более того. Вполне возможно, что программа работает так же, как и в реальной ситуации. Вы собираетесь отбросить датаграммы из-за природы UDP. Все, что находится ниже вашего вызова recv (), должно иметь дело с этим, и это следует считать нормальным. ЕСЛИ у вас все получится, тогда отлично. Если нет, довольствуйтесь тем, что вы ДЕЙСТВИТЕЛЬНО получаете, вероятно, поэтому он в первую очередь отправляется как UDP ... То, что вы видите сейчас, возможно, связано с планированием потоков. - person Len Holgate; 17.03.2010