Calificación:
  • 0 voto(s) - 0 Media
  • 1
  • 2
  • 3
  • 4
  • 5
sockets TCP - desbordamiento de buffer
#1
hola,

estoy analizando en profundidad los sockets con datos muy grandes. En concreto son imagenes.

El esquema es: Un emisor de graficos, multiples clientes, un servidor de sockets TCP que hace broadcast.

el problema es que el emisor genera datos más rapidamente que los clientes los consumen, provocando que el buffer sellena totalmente y se bloquea.

La unica manera que he encontrado es establecer una demora en el emisor de 2 segundos para cada envio de datos.

¿hay alguna posibilidad de hacerlo de otra manera? Alo que en ingles llaman un "flush" del bufer

La parte del emisor ahora está asi:


Código:
while True:
   # obtenemos la pantalla
   imagen = ImageGrab.grab()
   salida = io.BytesIO()
           
   # grabamos la imagen en el fichero de memoria
   imagen.save(salida, format="PNG")
   msg = struct.pack('>I', len(salida.getvalue())) + salida.getvalue()

   # enviamos el mensaje
   self.sock.sendall(msg)

   #hacemos tiempo para enviar el siguiente "pantallazo"
   time.sleep(2)

y, ciertamente, esto me parece horrible

La parte del cliente/receptor se ve asi:

Código:
     while recepcion:    # recepcion = True   not recepcion = False
           data = None
           diccionario = {}
           # Recupera los primeros 4 bytes del mensaje
           tot_len = 0
           while tot_len < self.RECV_MSG_LEN:
               msg_len = self.sock.recv(self.RECV_MSG_LEN)
               tot_len += len(msg_len)
           # Si el mensaje tiene los 4 bytes que representan la longitud ...
           if msg_len:
               data = b''
               # Desempaqueta el mensaje y obtiene la longitud del mensaje
               msg_len = struct.unpack('>I', msg_len)[0]
               tot_data_len = 0
               while tot_data_len < msg_len:
                   # Recupera el fragmento del tamaño máximo de RECV_BUFFER
                   chunk = self.sock.recv(self.RECV_BUFFER)
                   # Si no hay el pedazo esperado ...
                   if not chunk:
                       data = None
                       break # ... Simplemente sale del bucle
                   else:
                       # Une el contenido de los pedazos
                       data += chunk
                       tot_data_len += len(chunk)
Responder
#2
Hola. ¿Qué significa que el buffer se llene y se bloquee? Se supone que sendall() no retorna hasta enviar toda la información, a menos que lance una excepción.
Responder
#3
el problema es con el buffer del cliente. Se llena antes de procesar los primeros datos.

por ejemplo en la primera vuelta del bucle del cliente recibe 123000 bytes (correcto), y va bien. Pero en la siguiente vuelta del bucle, me dice que lo que va a procesar son 4 millones y pico de pytes ; y mientras tanto el emisor ya ha hecho varios envios (entre la primera y la segunda vuelta del cliente).

el select.select del servidor lo tengo con un timeout de 60 segs. Asi:

para_leer, para_escribir, en_error = select.select(self.conexiones, [ ], [ ], 60)

tambien he probado con 1 seg. de timeout, pero no se observa nada especial. Es lo unico que tengo un poco peculiar, para que me maneje las conexiones activas.
Responder
#4
No está mal usar sleep() para evitar enviar frames innecesarios que ni siquiera llegarán a percibirse (también se hace en el desarrollo de videojuegos o simulaciones 2D/3D). Es una forma de ahorrar recursos y optimizar el rendimiento. De lo contrario el bucle se va ejecutar en forma dispareja dependiendo del hardware en el que esté corriendo (en algunas computadoras más rápido, en otras más lento). Incluso podrías usar zlib o alguna otra librería para comprimir las imágenes que estás enviando y así ahorrar varios datos de red.

Saludos!
Responder
#5
gracias.
usaré zlib en su modo más comprimido para probar (quizas me ahorre algun segundo de sleep) : zlib.compress(datos,9), aunque segun el manual el nivel 6 es adecuado Z_DEFAULT_COMPRESSION
Responder
#6
Bueno... ya está solucionada la transicion suave entre frame y frame.
Le he añadido una lista donde se van almacenando los frames secuencialmente, y a la vez se saca y envia el frame[0]. En mi red local solo hay una demora de 1-2 frames.
Aqui pongo tal como queda el código:
Código:
import io, socket, struct, zlib, pickle, time
import threading

from PIL import Image, ImageGrab


_HOST = '127.0.0.1'  # define el host como "localhost"
_PORT = 9090         # define el puerto como "9090"

class Profesor:
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   lista_frames = []
   
   def __init__(self):
       self.sock.connect((_HOST, _PORT))
       self.enviaPantalla()
       
   def enviaPantalla(self):
       while True:
           # obtenemos la pantalla
           imagen = ImageGrab.grab()
           salida = io.BytesIO()
           
           # grabamos la imagen en el fichero de memoria
           imagen.save(salida, format="PNG")
           # obtenemos el mensaje a enviar
           msg = struct.pack('>I', len(salida.getvalue())) + salida.getvalue()
           
           hilo_graba = threading.Thread(target=self.anadeFrame, args = (msg, ))
           hilo_graba.start()
           hilo_graba.join()
           
           hilo_lee = threading.Thread(target=self.leeFrame)
           hilo_lee.start()
           hilo_lee.join()
           
   def anadeFrame(self, datos):
       self.lista_frames.append(datos)
       
       
   def leeFrame(self):
       if len(self.lista_frames) > 0 :
           self.sock.sendall(self.lista_frames[0])
           print(60, len(self.lista_frames[0]))
           self.lista_frames.pop(0)
       time.sleep(2)
       
       
if __name__ == '__main__' :
   pantalla = Profesor()

aunque de todas maneras me voy a mirar el opencv
Responder
#7
a ver... sigo teniendo problemas con los dichosos bufferes.

Si arranco primero el cliente, en el lado del emisor no hace falta hacer retardo (sleep); porque desde el primer momento, el servidor intermedio suelta el flujo TCP que tiene en ese momento ( y que es solamente un envio, cada vez)
Si arranco al reves (primero el emisor y despues el cliente), el servidor no tiene donde soltar los flujos que le van llegando del emisor.

El servidor envía asi (a todos los sockets, excepto al emisor y el del propio servidor):

if is_not_el_servidor and is_not_el_cliente_remitente:
try :
self._send(sock, client_message)

Por lo tanto, cuando se conecta el primer cliente, el servidor le "escupe" todo lo que tiene en su memoria (¿tarjeta de red?), lo que resulta inmanejable.

¿hay alguna manera en la que el servidor pueda deshacerse de lo que no puede entregar?

Otra cosa, que os puede servir. He descubierto esta libreria, que es mucho más rápida que PIL/pillow:

https://github.com/BoboTiG/python-mss
Responder
#8
No me queda bien en claro cuántos clientes o servidores hay y quién hace cada cosa, ¿cuál sería el servidor intermedio? Por otro lado, deberías simplemente no enviar (o directamente no capturar) la pantalla hasta tanto no se haya establecido la conexión.
Responder
#9
La transmision es unidireccional :  proveedor/emisor -->  servidor  --> cliente/visor
De todos modos estoy reescribiendo todo a datagramas UDP, porque realmente no importa si se pierde algun frame que otro.

te pongo los codigos en TCP:

cliente :

Código:
import socket, io, threading, os, pickle, base64, json
import struct, time, zlib

from tkinter import *

from PIL import Image, ImageTk


class RecibirEscritorio:
   a = 0
   hilos = []
   palabra = ''
   
   _HOST = '127.0.0.1'
   _PORT = 9090
   
   #MAX_WAITING_CONNECTIONS = 10
   RECV_BUFFER = 4096
   RECV_MSG_LEN = 4
   
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   
   def __init__(self, raiz, ancho_x, alto_y):
       self.sock.connect((self._HOST, self._PORT))
       self.raiz = raiz
       # estas medidas serán las mismas que el programa emisor
       # se utilizarán como coordenadas para los clicks, frames, etc.
       self.ancho_x = ancho_x
       self.alto_y = alto_y
       # medias del emisor
       self.widthemisor = 0
       self.heigthemisor = 0
       # para habilitar/deshabilitar raton y teclado
       self.raton_cliente = False
       # Esto lo usaremos para mantener el rastro de un un drag & drop
       # cuando se arrastre (drag). Para un futuro
       self._drag_data = {"x": 0, "y": 0}
       
       # creamos un menu simple de conexion / desconexion
       self.crea_menu()
       
       self.fm_prin = Frame(self.raiz, height=self.alto_y, width=self.ancho_x)
       #self.fm_prin.pack(expand = True, fill = "none")
       self.fm_prin.pack()
       self.lienzo = Canvas(self.fm_prin, width=self.ancho_x, height=self.alto_y)
       self.lienzo.pack(expand = YES, fill = BOTH)
       #self.lienzo.pack()
       #btn_iniciar = Button(self.fm_prin, text = 'Recibir transmisión',\
       #            command = self.cambia)
       #btn_iniciar.pack()
       
       ancho_lienzo = self.lienzo.winfo_width()
       alto_lienzo = self.lienzo.winfo_height()
       #imagen de fondo inicial
       img = Image.open('./iconos/fondo_visor.jpg')
       img = img.resize((self.ancho_x, self.alto_y), Image.ANTIALIAS)
       tkimg = ImageTk.PhotoImage(img)
       self.img_frames = self.lienzo.create_image(ancho_lienzo/2, alto_lienzo/2, anchor=NW, image = tkimg)
       self.lienzo.image = tkimg   # para mantener la referencia
       # ahora ato los botones y teclas al canvas
       #self.raiz.bind("<Button-3>", self.onclick_derecho)
       #self.raiz.bind("<Button-1>", self.onclick_izquierdo)
       #self.raiz.bind("<B1-Motion>", self.raton_drag)
       #self.raiz.bind("<Key>", self.OnKeyboardEvent)

   
   def crea_menu(self):
       #primero asignamos los iconos del menu
       self.icono_trans = PhotoImage(file='./iconos/transmitir.png')
       self.icono_notrans = PhotoImage(file='./iconos/no_transmitir.png')
       self.icono_salir_prog = PhotoImage(file='./iconos/salir_prog.png')
       
       # se inicia con la creacion de la barra del menu
       self.menu_barra = Menu(self.raiz,font="size 10")
       #-----------------------------------
       # primer menu y submenus
       menu1 = Menu(self.menu_barra, tearoff=0)
       
       menu1.add_command(label='Inicia Recepcion transmision', font="size 10",\
               compound='left', image=self.icono_trans, command = self.cambia)
       menu1.add_command(label='Para Transmision Escritorio', font="size 10",\
               compound='left', image=self.icono_notrans, command = self.para_transmision)
               
       menu1.add_separator()
       
       menu1.add_command(label='Salir', font="size 10", \
                   compound='left', image=self.icono_salir_prog, \
                   command=self.salir)
   
       self.menu_barra.add_cascade(label='Menu de Prueba', menu=menu1)
       
       #terminamos el menu
       self.raiz.config(menu=self.menu_barra)
       
       
   # ====== termina la gui =====

   def salir(self):
       self.a = 1
       """
       # termino los threads pendientes
       for hilo in self.hilos:
           try:
               hilo.join()
           except:
               pass
       """
       self.raiz.destroy()
       os._exit(0)
   
   
   def para_transmision(self):
       self.a = 1
       # termino los threads pendientes
       #matar_hilos.clear()
       for hilo in self.hilos:
           try:
               hilo.join()
           except:
               pass
       self.hilos = []
       
   def cambia(self):
       t = threading.Thread(target=self.cargar_imagen)
       self.hilos.append(t)
       t.start()
       
       
   def cargar_imagen(self):
       self.a = 0
       #bloqueo = threading.Lock()
       while self.a == 0:
           """
           Recibe un mensaje entrante del cliente y lo desempaqueta.

           :param sock: el socket entrante
           :return: el mensaje desempaquetado
           """
           #bloqueo.acquire()
           data = None
           diccionario = {}
           # Recupera los primeros 4 bytes del mensaje
           tot_len = 0
           while tot_len < self.RECV_MSG_LEN:
               msg_len = self.sock.recv(self.RECV_MSG_LEN)
               tot_len += len(msg_len)
           # Si el mensaje tiene los 4 bytes que representan la longitud ...
           if msg_len:
               data = b''
               # Desempaqueta el mensaje y obtiene la longitud del mensaje
               msg_len = struct.unpack('>I', msg_len)[0]
               print(msg_len)
               tot_data_len = 0
               while tot_data_len < msg_len:
                   # Recupera el fragmento del tamaño máximo de RECV_BUFFER
                   chunk = self.sock.recv(self.RECV_BUFFER)
                   # Si no hay el pedazo esperado ...
                   if not chunk:
                       data = None
                       break # ... Simplemente sale del bucle
                   else:
                       # Une el contenido de los pedazos
                       data += chunk
                       tot_data_len += len(chunk)
           
           # ahora tenemos que hacer al reves que profesor.py para mostrar la captura de pantalla
           #print(data)
           #datos = zlib.decompress(data)
           image = Image.open(io.BytesIO(data))
           #image.show()
           #break
           tkimg = ImageTk.PhotoImage(image)
           self.lienzo.itemconfig(self.img_frames, image = tkimg)
           self.lienzo.image = tkimg   # para mantener la referencia
           
           


def inicia_cliente():
   NOMBRE_PROGRAMA = 'Visor Recibe Escritorio'
   ancho_x = 600
   alto_y = 600
   
   root = Tk()
   root.geometry(str(ancho_x) + 'x' + str(alto_y) +'+500+1')   # la diferencia es el alo del menu
   root.title(NOMBRE_PROGRAMA)

   #img = PhotoImage(file='./iconos/pantalla.png')
   #root.tk.call('wm', 'iconphoto', root._w, img)

   cierra_win = RecibirEscritorio(root, ancho_x, alto_y)
   root.protocol('WM_DELETE_WINDOW', cierra_win.salir)

   root.mainloop()


if __name__ == '__main__':
   inicia_cliente()


Proveedor:
Código:
import io, socket, struct, zlib, pickle, time
import threading

from PIL import Image, ImageGrab
import mss          # https://github.com/BoboTiG/python-mss
import mss.tools


_HOST = '127.0.0.1'  # define el host como "localhost"
_PORT = 9090         # define el puerto como "9090"
_hilos = []


class Profesor:
   #MAX_WAITING_CONNECTIONS = 10
   #RECV_BUFFER = 4096
   #RECV_MSG_LEN = 4
   
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   emisorFlag = True
   
   def __init__(self):
       # hacer un reusadrees. Falta
       self.sock.connect((_HOST, _PORT))
       #self.enviaPantalla()
       
   def enviaPantalla(self):
       # obtenemos una porcion de la pantalla y la grabamos en salida
       sct = mss.mss()
       # Use the 1st monitor. Me da las medidas de la pantalla
       monitor = sct.monitors[1]
       
       # Capture a bbox using percent values
       left = monitor['left'] + monitor['width'] * 5 // 100  # 5% from the left
       top = monitor['top'] + monitor['height'] * 5 // 100  # 5% from the top
       right = left + 600  # 600px width
       lower = top + 600  # 600px height
       bbox = (left, top, right, lower)
       
       while self.emisorFlag:
           # creamos el espacio donde salvaremos la pantalla
           salida = io.BytesIO()
           """
           # obtenemos una porcion de la pantalla y la grabamos en salida
           with mss.mss() as sct:
               # Use the 1st monitor. Me da las medidas de la pantalla
               monitor = sct.monitors[1]
               
               # Capture a bbox using percent values
               left = monitor['left'] + monitor['width'] * 5 // 100  # 5% from the left
               top = monitor['top'] + monitor['height'] * 5 // 100  # 5% from the top
               right = left + 600  # 600px width
               lower = top + 600  # 600px height
               bbox = (left, top, right, lower)
           """
           # Grab the picture
           # Using PIL would be something like:
           # im = ImageGrab(bbox=bbox)
           im = sct.grab(bbox)

           # Save it! para grabarlo en disco
           # mss.tools.to_png(im.rgb, im.size, output='screenshot.png')
           # Create the Image en PIL
           imagen = Image.frombytes('RGB', im.size, im.rgb)
           
           # grabamos la imagen en el fichero de memoria
           imagen.save(salida, format="PNG")
           
           # creamos el mensaje a enviar
           msg = struct.pack('>I', len(salida.getvalue())) + salida.getvalue()
           # enviamos el mensaje
           self.sock.sendall(msg)
                 
       
       
if __name__ == '__main__' :
   pantalla = Profesor()
   hilo_envia = threading.Thread(target=pantalla.enviaPantalla)
   _hilos.append(hilo_envia)
   hilo_envia.start()
   
   
   # hacer una salida por menu donde _hilos.join() y emisorFlag = False


Servidor:


Código:
import struct
import socket
import select
import threading

_HOST = ''  # define el host en las IPs disponibñes
_PORT = 9090         # define el puerto como "9090"

# ahora definimos el objeto ChatServer como un threading.Thread
# esto significa que tenemos que sobreescribir el metodo run() a fin de
# llamar al metodo start() de la clase madre y hacer que la clase ChatSever
# funcione adecuadamente.
# lo primero definimos tres constantes para el ChatSever:
#
#    MAX_WAITING_CONNECTIONS especifica la cantidad máxima de conexiones en cola antes de que se rechace.
#    RECV_BUFFER especifica el tamaño (en bytes) del buffer de recepcion.
#    RECV_MSG_LEN especifica el tamaño (en bytes) del marcador de posición contenido al comienzo de los mensajes.
"""
¿Por qué necesitamos un marcador de posición al comienzo de cada mensaje?
Si queremos seguir de cerca el enunciado del problema, no necesariamente lo necesitamos.
Pensemos en la situación en la que el cliente está enviando una cantidad considerable de datos.
Digamos que el cliente envía un mensaje de 10000 bytes mientras que nuestro servidor
puede recibir como máximo 4096 bytes a la vez.
Ahora, nos enfrentamos con una decisión: podemos simplemente asumir que el servidor recopila
los primeros 4096 bytes de cada mensaje del cliente o podemos estudiar una solución para
recuperar el mensaje completo de 10000 bytes (y generalmente un mensaje de cualquier longitud).
Hagamos las cosas más difíciles e intentemos implementar una solución que permita la recepción
completa de cada mensaje individual. Como TCP / IP es un protocolo basado en flujos,
necesitamos definir nuestro propio protocolo basado en mensajes sobre TCP / IP para
distinguir el inicio y el final de cada mensaje individual.
Un protocolo fácil y eficaz, sugerido en
   https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data
y aplicado aquí, espera que el cliente prefija cada mensaje con su longitud.
Entonces, es por eso que presentamos la constante RECV_MSG_LEN.
"""

class ChatServer(threading.Thread):
   """
   Define el servidor chat como un Thread.
   """

   MAX_WAITING_CONNECTIONS = 10
   RECV_BUFFER = 4096
   RECV_MSG_LEN = 4

   def __init__(self, host, port):
       """
       Inicializa un nuevo ChatServer.

       :param host: el host al cual esta vinculado el servidor
       :param port: el puerto al cual esta vinculado el servidor
       """
       threading.Thread.__init__(self)
       self.host = host
       self.port = port
       self.connections = []   # para almacenar todas las conexiones entrantes activas
       self.running = True     # dice si el servidor debe seguir corriendo. Cambia el estado de funcionando a parado y viceversa
       
       
   """
   Definimos la funcion _bind_socket, la cual crea el socket del servidor.
   Lo vincula al host y puerto especificados y escucha a lo sumo __MAX_WAITING_CONNECTIONS entrantes
   """
   def _bind_socket(self):
       """
       Crea el socket servidor y lo iga al host y puerto dados.
       """
       self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       #self.server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
       self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
       self.server_socket.bind((self.host, self.port))
       self.server_socket.listen(self.MAX_WAITING_CONNECTIONS)
       self.connections.append(self.server_socket)
   
   """
   definimos el método de conveniencia _send que prefija un mensaje dado con su
   longitud antes de enviarlo a través de una conexión de socket determinada.
   Será utilizado por el servidor para transmitir un mensaje de un cliente a
   todos los demás clientes conectados.
   """
   def _send(self, sock, msg):
       """
       Prefija cada mensaje con una longitud de 4 bytes antes de enviar.

       :param sock: el socket entrante
       :param msg: el mensaje a enviar
       """
       # Empaqueta el mensaje con 4 bytes en cabecera que representan la longitud del mensaje
       msg = struct.pack('>I', len(msg)) + msg
       # Envia el mensaje empaquetado
       sock.sendall(msg)
       
   """
   Definimos la función _receive que contiene la lógica de implementación de recepción de datos.
   De acuerdo con el protocolo de mensajes que hemos elegido construir a través de TCP / IP,
   cada mensaje del cliente tiene un prefijo de 4 bytes que representa su longitud.
   Entonces, el servidor, a la recepcion de cada mensaje, lo descomprimirá y leerá sus
   primeros 4 bytes para obtener la longitud.
   Una vez que se ha adquirido esta información, el servidor llamará varias veces al
   método recv para obtener el mensaje total.
   """
   def _receive(self, sock):
       """
       Recibe un mensaje entrante del cliente y lo desempaqueta.

       :param sock: el socket entrante
       :return: el mensaje desempaquetado
       """
       data = None
       # Recupera los primeros 4 bytes del mensaje
       tot_len = 0
       while tot_len < self.RECV_MSG_LEN:
           msg_len = sock.recv(self.RECV_MSG_LEN)
           tot_len += len(msg_len)
       # Si el mensaje tiene los 4 bytes que representan la longitud ...
       if msg_len:
           data = b''
           # Desempaqueta el mensaje y obtiene la longitud del mensaje
           msg_len = struct.unpack('>I', msg_len)[0]
           tot_data_len = 0
           while tot_data_len < msg_len:
               # Recupera el fragmento del tamaño máximo de RECV_BUFFER
               chunk = sock.recv(self.RECV_BUFFER)
               # Si no hay el pedazo esperado ...
               if not chunk:
                   data = None
                   break # ... Simplemente sale del bucle
               else:
                   # Une el contenido de los pedazos
                   data += chunk
                   tot_data_len += len(chunk)
       
       return data
   
   """
   Definimos el método _broadcast.
   Dados el socket y el mensaje del cliente entrante, itera sobre las conexiones de
   socket relevantes para distribuir el mensaje a todos los demás clientes conectados
   utilizando el método _send. Además, el método detecta posibles desconexiones del
   cliente y elimina los sockets involucrados de la lista de los sockets activos.
   """
   def _broadcast(self, client_socket, client_message):
       """
       Transmite un mensaje a todos los clientes diferentes tanto del servidor como
       del cliente que envía el mensaje.

       :param client_socket: el socket del cliente que eniva el mensaje
       :param client_message: el mensaje a divulgar publicamente
       """
       for sock in self.connections:
           is_not_the_server = sock != self.server_socket
           is_not_the_client_sending = sock != client_socket
           if is_not_the_server and is_not_the_client_sending:
               try :
                   self._send(sock, client_message)
               except socket.error:
                   # Maneja una posible desconexión del cliente "sock" con ...
                   sock.close()  # cierra la conexion del socket
                   self.connections.remove(sock)  # borra el socket de la lista de comunicaciones activas
       
       
   """
   Definimos el método _run que implementa la lógica general del servidor.
   Siempre que el servidor se ejecute, monitorea continuamente todos los sockets activos
   (contenidos en la lista self.connections) para una actividad legible.
   Este proceso se realiza con Select.select__call. La función __select toma como entrada
   tres listas de sockets que están, respectivamente, esperando para lectura, escritura o
   por una condición excepcional y, a su vez, devuelve las listas de descriptores de sockets
   que sean legibles, escribibles o simplemente han caído en un estado de error .
   La única lista que nos importa es la lista de sockets legibles.
   En este punto, si el socket del servidor se vuelve legible, el servidor aceptará y
   manejará una nueva conexión de cliente.
   Como añadido, el servidor transmitirá a todos los otros clientes conectados un mensaje que
   indica que un nuevo cliente ingresó a la sala de chat.
   Por otro lado, si un socket de cliente se vuelve leible, el servidor adquirirá el mensaje
   entrante del cliente y lo reenviará a todos los demás sockets conectados (excepto el de envío).
   La transmisión se realiza a través de la función _broadcast descrita arriba.
   """
   def _run(self):
       """
       Este es el metodo que corre el servidor.
       """
       while self.running:
           # Obtiene la lista de sockets que estan listos para leerse a traves de llamadas no bloqueantes a select
           # El select tiene un timeout de 60 segundos. Se lo he quitado
           try:
               ready_to_read, ready_to_write, in_error = select.select(self.connections, [], [])   # , 60)
           except socket.error:
               continue
           else:
               for sock in ready_to_read:
                   # Si la instancia del socket es el socket del servidor ...
                   if sock == self.server_socket:
                       try:
                           # Maneja una conexion nueva del cliente
                           client_socket, client_address = self.server_socket.accept()
                       except socket.error:
                           break
                       else:
                           self.connections.append(client_socket)
                           print ("Cliente (%s, %s) conectado " % client_address)

                           # Notifica a todos los clientes conectados que ha ingresado uno nuevo
                           #self._broadcast(client_socket, "\n[%s:%s] entered the chat room\n" % client_address)
                   # ...de otro modo es una conexión de socket de cliente entrante
                   else:
                       try:
                           data = self._receive(sock) # Obtiene el mensaje del cliente...
                           if data:
                               # ... y lo transmite a todos los clientes conectados
                               #self._broadcast(sock, "\r" + '<' + str(sock.getpeername()) + '> ' + data)  # poner a bytes
                               self._broadcast(sock, data)
                       except socket.error:
                           # Emite a todos los clientes conectados que un cliente ha abandonado
                           #self._broadcast(sock, "\nClient (%s, %s) is offline\n" % client_address)
                           #print ("Client (%s, %s) is offline" % client_address)
                           sock.close()
                           self.connections.remove(sock)
                           continue
       # Limpia la conexion socket
       self.stop()
       
   """
   define el método run, que reemplaza automáticamente el método homónimo en la clase madre.
   Simplemente crea y vincula el socket del servidor utilizando la función _bind_socket y luego
   ejecuta el método _run que implementa la lógica del servidor.
   """
   def run(self):
       """
       Dados un host y un puerto, vincula el socket y corre el servidor.
       """
       self._bind_socket()
       self._run()
   
   """
   Definimos el método stop que simplemente establece la variable self.running en False y
   cierra la conexión del socket del servidor. Esto nos permitirá detener nuestro servidor
   una vez que finalice la ejecución de los casos de prueba.
   """
   def stop(self):
       """
       Detiene el servidor configurando el indicador "running" antes de cerrar la conexión de socket.
       """
       self.running = False
       self.server_socket.close()
       
       
"""
definimos y ejecuta la función main.
Simplemente crea un objeto ChatServer y llama a su método start().
"""
def main():
   """
   La funcion principal del programa. Crea y corre una nueva instancia de ChatServer.
   """
   chat_server = ChatServer(_HOST, _PORT)
   chat_server.start()


if __name__ == '__main__':
   """
   El punto de entrada al programa.
   """
   main()
Responder
#10
Al final, ignorando el tema del buffer, he solucionado el problema con una solución alternativa.
Por mi parte doy por cerrado el hilo. Si alguno teneis alguna duda sobre el codigo, preguntad ... xDD

En la parte del emisor:
- Se arranca aqui el servidor con subprocess.popen (escondo la ventana shell del servidor)
- Se crea una lista de usuarios vacía, en la que se iran acumulando los usuarios que se vayan conectando. Mientras haya 1 usuario activo seguirá transmitiendo pantallazos; en caso contrario seguirá en el bucle sin hacer nada (simplemente dando vueltas)

Codigo emisor/proveedor:
Código:
import io, socket, struct, zlib, pickle, time
import threading, subprocess, os

from PIL import Image, ImageGrab
import mss          # https://github.com/BoboTiG/python-mss
import mss.tools


"""
   clase para arrancar el servidor en un subproceso
"""

class Sistema:

   def procesador(self, instruccion):
       #para que no salga la shell (pantalla negra) en windows
       startupinfo = None
       retorno = 0
       if os.name == 'nt':
           startupinfo = subprocess.STARTUPINFO()
           startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW

       with open(os.devnull, 'wb') as devnull:
           p1=subprocess.Popen(
               instruccion,
               startupinfo=startupinfo,
               stdout=devnull,
               stderr=subprocess.STDOUT,
               universal_newlines=True     #por si acaso para el windows
               )
           #p1.wait()  
           # no esperar a que termine ya que el programa es un bucle
           # infinito y no retorna al shell de sistema
           # en nuestro caso como el programa no termina el retorno será None
           retorno = p1.poll()

       return retorno
       
       
   def iniciaServidor(self):
       #corto la instrucción del shell por los espacios
       #y creo una lista/array con cada elemento
       instruccion = [
           'python',
           'servidor_imagenes.py'
           ]
       return self.procesador(instruccion)



"""
   Clase para el emisor de imagenes
"""

_HOST = '127.0.0.1'  # define el host como "localhost"
_PORT = 9090         # define el puerto como "9090"
_hilos = []


class Profesor:
   
   RECV_BUFFER = 4096
   RECV_MSG_LEN = 4
   
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   emisorFlag = True
   listaAlumnos = []
   
   def __init__(self):
       # hacer un reusadrees. Falta
       self.sock.connect((_HOST, _PORT))
       
       
   def enviaPantalla(self):
       # obtenemos una porcion de la pantalla y la grabamos en salida
       sct = mss.mss()
       # Use the 1st monitor. Me da las medidas de la pantalla
       monitor = sct.monitors[1]
       
       # Capture a bbox using percent values
       left = monitor['left'] + monitor['width'] * 5 // 100  # 5% from the left
       top = monitor['top'] + monitor['height'] * 5 // 100  # 5% from the top
       right = left + 600  # 600px width
       lower = top + 600  # 600px height
       bbox = (left, top, right, lower)
       contador = 0
       
       while self.emisorFlag:
           if len(self.listaAlumnos) > 0:
               contador += 1
               # creamos el espacio donde salvaremos la pantalla
               salida = io.BytesIO()
             
               # Grab the picture
               # Using PIL would be something like:
               # im = ImageGrab(bbox=bbox)
               im = sct.grab(bbox)

               # Save it! para grabarlo en disco
               # mss.tools.to_png(im.rgb, im.size, output='screenshot.png')
               # Create the Image en PIL
               imagen = Image.frombytes('RGB', im.size, im.rgb)
               
               # grabamos la imagen en el fichero de memoria
               imagen.save(salida, format="PNG")
               
               # creamos el mensaje a enviar
               #pantalla = struct.pack('>I', len(salida.getvalue())) + salida.getvalue()
               dic_msg = {
                   'tipoMensaje' : 'envioPantalla',
                   'pantalla' : salida.getvalue()
               }
               datos = pickle.dumps(dic_msg)
               msg = struct.pack('>I', len(datos)) + datos
               print(81, len(msg), self.listaAlumnos, contador)
               # enviamos el mensaje
               self.sock.sendall(msg)
               
   
   
   def recibeMensaje(self):
       """
       Recibe un mensaje entrante del cliente y lo desempaqueta.
       Fundamentalmente son altas y bajas de usuarios
       """
       while self.emisorFlag:
           data = None
           dicc = {}
           # Recupera los primeros 4 bytes del mensaje
           tot_len = 0
           while tot_len < self.RECV_MSG_LEN:
               msg_len = self.sock.recv(self.RECV_MSG_LEN)
               tot_len += len(msg_len)
           # Si el mensaje tiene los 4 bytes que representan la longitud ...
           if msg_len:
               data = b''
               # Desempaqueta el mensaje y obtiene la longitud del mensaje
               msg_len = struct.unpack('>I', msg_len)[0]
               print(msg_len)
               tot_data_len = 0
               while tot_data_len < msg_len:
                   # Recupera el fragmento del tamaño máximo de RECV_BUFFER
                   chunk = self.sock.recv(self.RECV_BUFFER)
                   # Si no hay el pedazo esperado ...
                   if not chunk:
                       data = None
                       break # ... Simplemente sale del bucle
                   else:
                       # Une el contenido de los pedazos
                       data += chunk
                       tot_data_len += len(chunk)
           if data:
               dicc = pickle.loads(data)
               
               if dicc['tipoMensaje'] == 'altaCliente':
                   self.listaAlumnos.append(dicc['usuario'])
                   
               if dicc['tipoMensaje'] == 'bajaCliente':
                   try:
                       self.listaAlumnos.remove(dicc['usuario'])
                   except ValueError:
                       pass
                   
                   
       
if __name__ == '__main__' :
   # primero tratamos de iniciar el servidor
   xxx = Sistema()
   result_proceso = xxx.iniciaServidor()
   # si != None el servidor ha terminado, lo cual no es correcto.
   # si == 0, terminacion correcta; otro codigo, terminacion anomala
   if result_proceso is not None:
       os._exit(1)
       
   # ahora vamos con el emisor de imagenes
   pantalla = Profesor()
   
   hilo_recibe = threading.Thread(target=pantalla.recibeMensaje)
   _hilos.append(hilo_recibe)
   hilo_recibe.start()
   
   
   hilo_envia = threading.Thread(target=pantalla.enviaPantalla)
   _hilos.append(hilo_envia)
   hilo_envia.start()
   
   # Se sale por fuerza bruta, por ahora.
   # hacer una salida por menu donde _hilos.join() y emisorFlag = False

En la parte del cliente:
- mientras no esté arranado el servidor, el programa retorna al shell del sistema.
- si el servidor está arrancado, manda un mensaje con su nombre de usuario. El emisor lo añade a su lista de usuarios y empieza a transmitirle.

Codigo del cliente/receptor:
Código:
import socket, io, threading, os, pickle, base64, json
import struct, time, zlib

from tkinter import *

from PIL import Image, ImageTk


class RecibirEscritorio:
   a = 0
   hilos = []
   palabra = ''
   
   _HOST = '127.0.0.1'
   _PORT = 9090
   
   #MAX_WAITING_CONNECTIONS = 10
   RECV_BUFFER = 4096
   RECV_MSG_LEN = 4
   
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   
   def __init__(self, raiz, ancho_x, alto_y):
       self.raiz = raiz
       #self.inicia_conexion()
       # estas medidas serán las mismas que el programa emisor
       # se utilizarán como coordenadas para los clicks, frames, etc.
       self.ancho_x = ancho_x
       self.alto_y = alto_y
       # medias del emisor
       self.widthemisor = 0
       self.heigthemisor = 0
       # para habilitar/deshabilitar raton y teclado
       self.raton_cliente = False
       # Esto lo usaremos para mantener el rastro de un un drag & drop
       # cuando se arrastre (drag). Para un futuro
       self._drag_data = {"x": 0, "y": 0}
       
       # creamos un menu simple de conexion / desconexion
       self.crea_menu()
       
       self.fm_prin = Frame(self.raiz, height=self.alto_y, width=self.ancho_x)
       self.fm_prin.pack(expand = True, fill = "none")
       #self.fm_prin.pack()
       
       self.lienzo = Canvas(self.fm_prin, width=self.ancho_x, height=self.alto_y)
       self.lienzo.pack(expand = YES, fill = BOTH)
       
       ancho_lienzo = self.lienzo.winfo_width()
       alto_lienzo = self.lienzo.winfo_height()
       #imagen de fondo inicial
       img = Image.open('./iconos/fondo_visor.jpg')
       img = img.resize((self.ancho_x, self.alto_y), Image.ANTIALIAS)
       tkimg = ImageTk.PhotoImage(img)
       self.img_frames = self.lienzo.create_image(ancho_lienzo/2, alto_lienzo/2, anchor=NW, image = tkimg)
       self.lienzo.image = tkimg   # para mantener la referencia
       # ahora ato los botones y teclas al canvas. PENDIENTE
       #self.raiz.bind("<Button-3>", self.onclick_derecho)
       #self.raiz.bind("<Button-1>", self.onclick_izquierdo)
       #self.raiz.bind("<B1-Motion>", self.raton_drag)
       #self.raiz.bind("<Key>", self.OnKeyboardEvent)
       
       # ahora intentamos conectar con el servidor
       try:
           self.sock.connect((self._HOST, self._PORT))
       except socket.error:
           print("No se ha arrancado el servidor. Saliendo...")
           self.salir()

   
   def crea_menu(self):
       #primero asignamos los iconos del menu
       self.icono_trans = PhotoImage(file='./iconos/transmitir.png')
       self.icono_notrans = PhotoImage(file='./iconos/no_transmitir.png')
       self.icono_salir_prog = PhotoImage(file='./iconos/salir_prog.png')
       
       # se inicia con la creacion de la barra del menu
       self.menu_barra = Menu(self.raiz,font="size 10")
       #-----------------------------------
       # primer menu y submenus
       menu1 = Menu(self.menu_barra, tearoff=0)
       
       menu1.add_command(label='Inicia Recepcion transmision', font="size 10",\
               compound='left', image=self.icono_trans, command = self.cambia)
       menu1.add_command(label='Para Transmision Escritorio', font="size 10",\
               compound='left', image=self.icono_notrans, command = self.para_transmision)
               
       menu1.add_separator()
       
       menu1.add_command(label='Salir', font="size 10", \
                   compound='left', image=self.icono_salir_prog, \
                   command=self.salir)
   
       self.menu_barra.add_cascade(label='Menu de Prueba', menu=menu1)
       
       #terminamos el menu
       self.raiz.config(menu=self.menu_barra)
       
       
   # ====== termina la gui =====

   def salir(self):
       self.inicia_baja()
       print("Saliendo del programa. Espere.")
       self.a = 1
       time.sleep(2)
       
       self.raiz.destroy()
       os._exit(0)
   
   
   def para_transmision(self):
       
       hilo_baja = threading.Thread(target=self.inicia_baja())
       self.hilos.append(hilo_baja)
       hilo_baja.start()
       
       #self.sock.shutdown(1)
       print ('Parada en progreso. Espere')
       time.sleep(2)
       print("Recepción parada")  
       
       self.a = 1
       
       
   
   def inicia_alta(self):
       dicc = {
           'tipoMensaje' : 'altaCliente',
           'usuario' : 'Calvicius'
       }
       datos_envio = pickle.dumps(dicc)
       msg = struct.pack('>I', len(datos_envio)) + datos_envio
       try:
           self.sock.sendall(msg)
       except socket.error:
           print("Parece que el servidor no esta arrancado")
       
       
   def inicia_baja(self):
       dicc = {
           'tipoMensaje' : 'bajaCliente',
           'usuario' : 'Calvicius'
       }
       datos_envio = pickle.dumps(dicc)
       msg = struct.pack('>I', len(datos_envio)) + datos_envio
       try:
           self.sock.sendall(msg)
       except socket.error:
           print("Parece que el servidor no esta arrancado")
   
   
   def cambia(self):
       self.a = 0
       t1 = threading.Thread(target=self.inicia_alta)
       t1.start()
       t1.join()
       
       t = threading.Thread(target=self.cargar_imagen)
       self.hilos.append(t)
       t.start()
       
   

   
   def cargar_imagen(self):
       #self.a = 0
       contador = 0
       while self.a == 0:
           """
           Recibe un mensaje entrante del cliente y lo desempaqueta.
           """
           
           data = None
           diccionario = {}
           # Recupera los primeros 4 bytes del mensaje
           tot_len = 0
           while tot_len < self.RECV_MSG_LEN:
                   msg_len = self.sock.recv(self.RECV_MSG_LEN)
                   tot_len += len(msg_len)
           
           # Si el mensaje tiene los 4 bytes que representan la longitud ...
           if msg_len:
               data = b''
               # Desempaqueta el mensaje y obtiene la longitud del mensaje
               msg_len = struct.unpack('>I', msg_len)[0]
               
               tot_data_len = 0
               while tot_data_len < msg_len:
                   # Recupera el fragmento del tamaño máximo de RECV_BUFFER
                   chunk = self.sock.recv(self.RECV_BUFFER)
                   # Si no hay el pedazo esperado ...
                   if not chunk:
                       data = None
                       break # ... Simplemente sale del bucle
                   else:
                       # Une el contenido de los pedazos
                       data += chunk
                       tot_data_len += len(chunk)
           
           # ahora tenemos que hacer al reves que profesor.py para mostrar la captura de pantalla
           if data:
               dicc = pickle.loads(data)
               if dicc['tipoMensaje'] == 'envioPantalla':
                   image = Image.open(io.BytesIO(dicc['pantalla']))
                   tkimg = ImageTk.PhotoImage(image)
                   self.lienzo.itemconfig(self.img_frames, image = tkimg)
                   self.lienzo.image = tkimg   # para mantener la referencia
                   


def inicia_cliente():
   NOMBRE_PROGRAMA = 'Visor Recibe Escritorio'
   ancho_x = 600
   alto_y = 600
   
   root = Tk()
   root.geometry(str(ancho_x) + 'x' + str(alto_y) +'+500+1')   # la diferencia es el alo del menu
   root.title(NOMBRE_PROGRAMA)

   #img = PhotoImage(file='./iconos/pantalla.png')
   #root.tk.call('wm', 'iconphoto', root._w, img)

   cierra_win = RecibirEscritorio(root, ancho_x, alto_y)
   root.protocol('WM_DELETE_WINDOW', cierra_win.salir)

   root.mainloop()


if __name__ == '__main__':
   inicia_cliente()
Responder


Salto de foro:


Usuarios navegando en este tema: 1 invitado(s)