Introduction au codes fontaines : LT Codes avec Python

En 1998, Michael Luby publie le Luby Transform Code (LT Code), un algorithme de correction d’erreur appartenant à la classe de codes « fontaine », un type de code correcteur pouvant générer un nombre infini de paquets permettant de reconstituer des données perdues lors de transferts à travers différents réseaux.

Illustration du principe de « code fontaine »

Ce type d’algorithme est particulièrement utile dans le cas de transmission sans aucun moyen de demander à récupérer une nouvelle fois les données, par exemple dans les cas suivants :

  • Où il n’y a physiquement qu’un sens de communication (une diode réseau, en cybersécurité) ;
  • Où le receveur n’a pas assez d’énergie ou n’est pas assez puissant pour requêter une nouvelle fois les données (une transmission spatiale) ;
  • Où n’importe quelle suite de paquets peut être reçus dans n’importe quel ordre et répartis complètement aléatoirement entre les récepteurs (transmission Peer-To-Peer) :
  • Etc.

Ce n’est pas le seul type d’algorithme à résoudre ce genre de problème. Le domaine des codes correcteur est très vaste, et le but de cet article est uniquement de proposer une approche simple au fonctionnement des LT Code. D’autres codes fontaines ont d’ailleurs aujourd’hui de meilleures performances, citons par exemple Raptor Codes / Online Codes et RaptorQ.

Nous verrons alors le fonctionnement basique des LT Codes et comment l’implémenter en Python de manière optimisée pour les communications à haut débit avec des vitesse de 100 MB/s  à 300 MB/s. le code source est en ligne sur Github.

fonctionnement général des LT CODES

Deux principes sont fondamentaux : les LT Codes doivent générer de nouveaux paquets –appelés symboles– à la volée, et plusieurs de ces symboles combinés ensembles peuvent permettent de reconstruire les données originelles.

La création des symboles (l’encodage) consiste à prendre aléatoirement des blocs de données initiales et de les combiner entre eux. L’opération utilisée est le XOR (notée \oplus) pour sa faculté à pouvoir effectuer l’opération inverse. Par exemple, soit s un symbole tel que s = b1 \oplus b2, alors :

  • Connaissant b1, on retrouve b2 tel que b2 = b1 \oplus s ;
  • Connaissant b2, on retrouve b1 tel que b1 = b2 \oplus s.

Prenons l’exemple suivant, où nous partageons nos données en N=5 blocs de taille identique et tirons aléatoirement 6 combinaisons :

Figure 1: Exemple de graphe biparti d’un LT Code.

Nous avons alors :

    \begin{align*}b1 = s1\\b2 \oplus b3 = s2\\b3 \oplus b4 = s3\\b3 \oplus b4 = s4\\b1 \oplus b3 \oplus b4 = s5\\b1 \oplus b4 = s6\end{align*}

Comme d’autres codes fontaines, l’encodage et décodage des LT Codes reposent sur la théorie des graphes et peuvent se représenter par un graphe biparti (figure 1). Le degré d’un symbole est défini comme étant son nombre de liaisons : s1 est de degré 1, s2\ s3\ s4 et s6 sont de degrés 2 et s5 est de degré 3.

Après avoir envoyé les symboles par paquets (où chaque symbole est transmis avec sa liste de blocs associée), le récepteur reçoit les paquets sauf le paquet contenant le symbole s4, perdu :

Figure 2: Exemple de réception des symboles et leur blocs associés. Les blocs et symboles non complets ou non reçus sont grisés.

Il ne nous reste alors plus qu’à décoder en cherchant les symboles qui correspondent directement à un bloc puis résoudre partiellement les autres symboles qui sont associés à ce même bloc.

Observant que b1 = s1, il est alors possible de XORer s5 et s6 avec la valeur de b1 (étape 1). Grâce à cela, s6 résout b4 dont il est directement égal. On peut alors XORer s3 et s5 avec b4 (étape 2). Les deux symboles s3 et s5 nous permettent de résoudre b2 et b3, les deux derniers blocs inconnus (étape 3).

Figure 3: Exemple de décodage avec 5 des 6 symboles reçus.

Au final, le décodage aura été effectué de manière optimale : k symboles reçus (ou paquets) auront été suffisants à décoder les k blocs. Le symbole s2 n’aura servi à rien et la perte du symbole s4 n’aura pas été pénalisante. Ce processus est quelques fois appelé « propagation des convictions » (en Anglais « belief propagation » ou BP).

Il y a plusieurs conditions nécessaires à un bon décodage :

  • Le décodeur doit posséder au moins k symboles si k blocs doivent être décodés.
  • Au moins un des symboles doit être de degré 1 (c’est-à-dire avoir un seul lien), pour décoder un bloc qui servira lui-même à décoder d’autres symboles, et ainsi de suite. C’est le « point d’entrée » du décodeur.

En pratique, si la transmission est soumise à un fort taux de perte de paquets alors il est préférable d’envoyer plusieurs symboles de degré 1 pour avoir plusieurs « points d’entrées ». En revanche, n’envoyer que des symboles de degré 1 revient à envoyer les données sans encodage et l’intérêt du code correcteur est perdu.

D’une autre manière, plus un symbole a un degré élevé et plus il pourra, à lui seul, être utile à la récupération de plusieurs blocs si ses autres blocs sont résolus. Par exemple, s5 nous permet à lui seul de pouvoir récupérer b1, b3 ou b4 si l’un de ces 3 est manquant et que les autres sont déjà résolus.

Enfin, étant donné le caractère non-déterministe des LT Codes, il n’est pas certain qu’il soit possible de résoudre tous les N blocs avec k symboles donnés si le tirage aléatoire est malchanceux. Prenons l’exemple suivant :

Figure 4: Exemple de décodage incomplet. Seul b1 est récupéré.

Dans ce cas, seul le degré de s6 a été changé de 2 à 1, empêchant le décodage complet. Augmenter les degrés des autres symboles est une solution pour résoudre le problème. Le choix du bon degré pour encoder les symboles est en fait un axe de recherche très important des LT Codes et représente un des leviers d’optimisation les plus importants de l’algorithme.

Une autre technique permet de résoudre ce problème en désactivant temporairement les liens provenant de certains blocs. Par exemple, en désactivant les liens de b2 alors s2 peut résoudre partiellement b3 et s3 peut résoudre partiellement b4, et ainsi de suite. Cette technique,  appelée « inactivation decoding » requiers des mathématiques plus complexes et ne sera pas abordée ici.

Optimisation des degrés : distribution idéale théorique

Le but du jeu consiste donc à déterminer à quelle fréquence des symboles de degré d seront créés. Michael Luby présente d’abord dans ses recherches une distribution appelée « ideal soliton distribution », dont la probabilité d’obtenir un degré d est telle que :

Figure 5: exemple de la distribution idéale théorique avec N=25.

p(k) = \begin{cases}\frac{1}{k}, & k=1 \\ \frac{1}{k(k-1)}, & k=2, 3, ..., N \end{cases}

Les petits degrés sont favorisés, avec l’accent sur la création de symboles issus de 2 blocs. Notez également la probabilité à 0 d’avoir un symbole issu de 0 blocs (ce qui est inutile). Voici son implémentation en Python :

def ideal_distribution(N):
    """ Create the ideal soliton distribution. 
    In practice, this distribution gives not the best results
    Cf. https://en.wikipedia.org/wiki/Soliton_distribution
    """

    probabilities = [0, 1 / N]
    probabilities += [1 / (k * (k - 1)) for k in range(2, N+1)]
    probabilities_sum = sum(probabilities)

    assert probabilities_sum >= 1 - EPSILON and probabilities_sum <= 1 + EPSILON, "The ideal distribution should be standardized"
    return probabilities

Dans l’idée, cette distribution minimise le nombre de symboles à créer avant qu’il soit possible de commencer un décodage. En pratique, la distribution ne donne pas de bons résultats et beaucoup de décodages restent bloqués pour peu qu’il n’y ait pas eu de symbole créé à partir d’un bloc manquant, possibilité accentuée par le fait que les petits degrés sont favorisés.

Optimisation des degrés : distribution robuste

Une autre distribution, appelée « robust soliton distribution » a pour but de mixer les « petits » degrés avec des degrés plus « grands » tel que nous l’avons supposé en fin de partie précédente. L’idée est d’ajouter un pic de probabilité pour un degré M (avec M < N), et de lisser le reste. Les valeurs sont définies telle que :

t_{d} = \begin{cases}\frac{1}{dM}, & d=1, 2, ..., M-1\\ \frac{\ln(N / (M \times\delta))}{M}, & d=M\\ 0, & d=M+1, ..., N \end{cases}

Avec delta un paramètre introduisant une probabilité d’échec du décodage. Puis, étant donné qu’il faut que la somme des probabilités soit égale à 1, on normalise :

p_{d} = p^{ISD}_{d} + t_d

Figure 6: Exemple de la distribution robuste théorique avec N=25.

p^{RSD}_{d} = \frac{p_d}{\sum_{d=1}^N p_d}

Alors, la distribution favorise à la fois petits degrés et les hauts degrés. Dans l’exemple ci-dessus, nous avons choisi M=13. De manière générale, choisir un M équivalent à environ la moitié du nombre de blocs N paraît être une valeur raisonnable. Voici l’implémentation de la distribution en Python :

ROBUST_FAILURE_PROBABILITY = 0.01

def robust_distribution(N):
    """ Create the robust soliton distribution. 
    This fixes the problems of the ideal distribution
    Cf. https://en.wikipedia.org/wiki/Soliton_distribution
    """

    # The choice of M is not a part of the distribution ; it may be improved
    # We take the median and add +1 to avoid possible division by zero 
    M = N // 2 + 1 
    R = N / M

    extra_proba = [0] + [1 / (i * M) for i in range(1, M)]
    extra_proba += [math.log(R / ROBUST_FAILURE_PROBABILITY) / M]  # Spike at M
    extra_proba += [0 for k in range(M+1, N+1)]

    probabilities = np.add(extra_proba, ideal_distribution(N))
    probabilities /= np.sum(probabilities)
    probabilities_sum = np.sum(probabilities)

    assert probabilities_sum >= 1 - EPSILON and probabilities_sum <= 1 + EPSILON, "The robust distribution should be standardized"
    return probabilities

Cette solution se révèle efficace puisqu’une partie des symboles sert directement à construire les blocs (petits degrés) et une autre partie des symboles sert à palier aux mauvaises réceptions et décodages impossibles (grands degrés).

Systematic LT Codes

Certains codes fontaines se disent « systématique » ou « non-systématique » (cf. Systematic Code), et certains codes peuvent très bien être l’un ou l’autre : c’est le cas des Raptor Codes, et aussi le cas des LT Codes.

En théorie des codes, un code systématique signifie que les données sont directement encodées telles quelles et que les k premiers symboles correspondent exactement aux k premiers blocs (et donc, de degrés 1). Cela revient en quelque sorte à envoyer une première fois les données complètes puis à encoder la redondance.

Jusqu’à présent, nous avons donc encodé de manière non-systématique. Si nous reprenons notre exemple, alors seuls les symboles s5 et s6 seraient issus de plusieurs blocs à la fois :

Figure 7: Exemple d’encodage dit « systématique ». Les premiers symboles sont directement encodés avec les valeurs des premiers blocs.

Dans l’hypothèse ou la perte de paquet est minime, alors encoder de manière systématique permet d’éviter de perdre du temps à encoder puis décoder les symboles.

Les symboles issus de la combinaison de nombreux blocs (donc avec un degré haut) – comme ici les symboles s5 et s6– offrent alors la possibilité de récupérer de nombreux blocs différents avec un seul symbole.

Encodage

Dans cette partie, je vous propose d’implémenter l’encodage en Python avec une attention particulière à sa rapidité d’exécution. Vous pourrez retrouver le code complet sur Github.

Dans l’hypothèse ou les données à encoder proviennent d’un fichier, la première chose à faire consiste à lire les données et les stocker d’une manière appropriée :

core.PACKET_SIZE = 65536
...

def blocks_read(file, filesize):
    """ Read the given file by blocks of `core.PACKET_SIZE` and use np.frombuffer() improvement.

    Byt default, we store each octet into a np.uint8 array space, but it is also possible
    to store up to 8 octets together in a np.uint64 array space.  
    
    This process is not saving memory but it helps reduce dimensionnality, especially for the 
    XOR operation in the encoding. Example:
    * np.frombuffer(b'x01x02', dtype=np.uint8) => array([1, 2], dtype=uint8)
    * np.frombuffer(b'x01x02', dtype=np.uint16) => array([513], dtype=uint16)
    """

    blocks_n = math.ceil(filesize / core.PACKET_SIZE)
    blocks = []

    # Read data by blocks of size core.PACKET_SIZE
    for i in range(blocks_n):
            
        data = bytearray(file.read(core.PACKET_SIZE))

        if not data:
            raise "stop"

        # The last read bytes needs a right padding to be XORed in the future
        if len(data) != core.PACKET_SIZE:
            data = data + bytearray(core.PACKET_SIZE - len(data))
            assert i == blocks_n-1, "Packet #{} has a not handled size of {} bytes".format(i, len(blocks[i]))

        # Paquets are condensed in the right array type
        blocks.append(np.frombuffer(data, dtype=core.NUMPY_TYPE))

    return blocks

Chaque bloc de donnée lu du type bytes (immuable) est casté vers le type bytearray (muable) pour ensuite permettre l’opération XOR entre deux blocs. Puisque tous les blocs doivent avoir la même taille mais que le fichier en entrée ne permet peut-être pas un découpage en partie égales, un « padding » à droite est ajouté au dernier bloc. Concrètement, cela signifie que si ce dernier bloc ne fait pas la bonne taille, des 0 lui sont rajoutés à la fin.

On utilise la fonction np.from_buffer() qui permet de stocker des tableaux de 64 bits au lieu de 8 bits (par défaut). Cela permet de réduire la dimensionnalité des bytearray et réduire le temps nécessaire pour un XOR.

On définira également une classe Symbol pour des raisons de praticité. Pour chaque symbole à créer, l’encodage itératif suit les étapes suivantes :

  1. Tirer aléatoirement un degré deg en fonction de la distribution des degrés.
  2. Sélectionner uniformément au hasard une quantité deg de blocs distincts, en utilisant comme seed l’index du symbole à créer.
  3. Combiner les blocs en les XORant pour obtenir le nouveau symbole.

La fonction encode() est donc la suivante:

class Symbol:
    __slots__ = ["index", "degree", "data", "neighbors"] # fixing attributes may reduce memory usage

    def __init__(self, index, degree, data):
        self.index = index
        self.degree = degree
        self.data = data

def get_degrees_from(distribution_name, N, k):
    """ Returns the random degrees from a given distribution of probabilities.
    The degrees distribution must look like a Poisson distribution and the 
    degree of the first drop is 1 to ensure the start of decoding.
    """

    if distribution_name == "ideal":
        probabilities = ideal_distribution(N)
    elif distribution_name == "robust":
        probabilities = robust_distribution(N)
    else:
        probabilities = None
    
    population = list(range(0, N+1))
    return [1] + choices(population, probabilities, k=k-1)

def encode(blocks, drops_quantity):

    # Display statistics
    blocks_n = len(blocks)
    assert blocks_n <= drops_quantity, "Because of the unicity in the random neighbors, it is need to drop at least the same amount of blocks"

    # Generate random indexes associated to random degrees, seeded with the symbol id
    random_degrees = get_degrees_from("robust", blocks_n, k=drops_quantity)

    for i in range(drops_quantity):
        
        # Get the random selection, generated precedently (for performance)
        selection_indexes, deg = generate_indexes(i, random_degrees[i], blocks_n)

        # Xor each selected array within each other gives the drop (or just take one block if there is only one selected)
        drop = blocks[selection_indexes[0]]
        for n in range(1, deg):
            drop = np.bitwise_xor(drop, blocks[selection_indexes[n]])
            # drop = drop ^ blocks[selection_indexes[n]] # according to my tests, this has the same performance

        # Create symbol, then log the process
        symbol = Symbol(index=i, degree=deg, data=drop)
        
        yield symbol


L’index du symbole est utilisé comme seed lors du tirage aléatoire des blocs pour permettre de ne pas envoyer la liste complète des blocs utilisés (ou neighbors dans le code), mais uniquement l’index afin que le graphe soit reconstruit côté décodeur. Cette astuce est souvent utilisée dans les codes correcteurs et m’a été proposée par @catid qui l’a lui-même utilisé dans son implémentation de code fontaine, Wirehair.

Créer un tableau en dehors de la boucle contenant les degrés tirés aléatoirement permet d’améliorer la rapidité du code en réduisant le nombre d’appels à random.sample(). En d’autres termes, il vaut mieux tirer une fois plusieurs nombres aléatoires que plusieurs fois un nombre aléatoire.

Mes tests n’ont pas relevé de différence entre l’opérateur Python ^ et np.bitwise_xor() pour XORer les bytearray entre eux. C’est, dit en passant, l’opération la plus lente de l’encodage.

Le code utilise yield dans le but de transférer directement les symboles empaquetés directement au réseau, sans les stocker dans la mémoire RAM.

Décodage

Au contraire de l’encodage qui permettrait d’envoyer les symboles à la volée dès la création, ici la fonction de décodage doit être appelée après la réception des symboles et les traite tous ensemble.

La première chose à faire est de reconstruire le graphe à l’aide des indexes des symboles reçus. Les liens (ici neighbors) sont enregistrés dans des set Python :

def recover_graph(symbols, blocks_quantity):
    """ Get back the same random indexes (or neighbors), thanks to the symbol id as seed.
    For an easy implementation purpose, we register the indexes as property of the Symbols objects.
    """

    for symbol in symbols:
        
        neighbors, deg = generate_indexes(symbol.index, symbol.degree, blocks_quantity)
        symbol.neighbors = {x for x in neighbors}
        symbol.degree = deg

    return symbols

Ensuite, on implémente la procédure de décodage itératif suivant les étapes :

  1. Chercher un symbole à décoder dont le degré est égal à 1
    1. Si un tel symbole existe, passer à (2)
    2. Sinon, le décodage a failli.
  2. Étant donné que le symbole a un seul degré, il n’a qu’un seul bloc associé. Donc les données du bloc à récupérer sont directement les données du symbole.
  3. Le bloc étant résolu, on peut alors chercher les symboles liés au bloc et supprimer leur liaison après avoir XORé leurs données. Ici, la fonction concernée est appelée reduce_neighbors().
  4. Si tous les blocs ont été résolus, arrêter le décodage. Sinon, répéter le processus.

La fonction decode() implémente le décodage complet tandis que reduce_neighbors() propage la résolution des blocs vers les symboles.

def decode(symbols, blocks_quantity):

    symbols_n = len(symbols)
    assert symbols_n > 0, "There are no symbols to decode."

    # We keep `blocks_n` notation and create the empty list
    blocks_n = blocks_quantity
    blocks = [None] * blocks_n

    # Recover the degrees and associated neighbors using the seed (the index, cf. encoding).
    symbols = recover_graph(symbols, blocks_n)
    
    solved_blocks_count = 0
    iteration_solved_count = 0
    start_time = time.time()
    
    while iteration_solved_count > 0 or solved_blocks_count == 0:
    
        iteration_solved_count = 0

        # Search for solvable symbols
        for i, symbol in enumerate(symbols):

            # Check the current degree. If it's 1 then we can recover data
            if symbol.degree == 1: 

                iteration_solved_count += 1 
                block_index = next(iter(symbol.neighbors)) 
                symbols.pop(i)

                # This symbol is redundant: another already helped decoding the same block
                if blocks[block_index] is not None:
                    continue

                blocks[block_index] = symbol.data

                # Update the count
                solved_blocks_count += 1

                # Reduce the degrees of other symbols that contains the solved block as neighbor             
                reduce_neighbors(block_index, blocks, symbols)

    return np.asarray(blocks), solved_blocks_count

def reduce_neighbors(block_index, blocks, symbols):
    """ Loop over the remaining symbols to find for a common link between 
    each symbol and the last solved block `block`

    To avoid increasing complexity and another for loop, the neighbors are stored as dictionnary
    which enable to directly delete the entry after XORing back.
    """
    
    for other_symbol in symbols:
        if other_symbol.degree > 1 and block_index in other_symbol.neighbors:
        
            # XOR the data and remove the index from the neighbors
            other_symbol.data = np.bitwise_xor(blocks[block_index], other_symbol.data)
            other_symbol.neighbors.remove(block_index)

            other_symbol.degree -= 1

L’utilisation des set Python est utile pour améliorer les performances de la fonction reduce_neighbors(), puisqu’il est possible d’accéder à une liaison d’un symbole via son index (par exemple symbol.neighbors[block_index]) plutôt que de parcourir symbol.neighbors via une boucle comme un tableau.

En complexité algorithmique, si d est le degré d’un symbole alors cette optimisation permet de passer de \mathcal{O}(k), k in [0; d[ à \mathcal{O}(1).

Une fois tous les blocs décodés, il suffit de reconvertir les bytearray en bytes et de les écrire dans un fichier :

def blocks_write(blocks, file, filesize):
    """ Write the given blocks into a file"""

    count = 0
    for data in recovered_blocks[:-1]:
        file_copy.write(data)
        count += len(data)

    # Convert back the bytearray to bytes and shrink back 
    last_bytes = bytes(recovered_blocks[-1])
    shrinked_data = last_bytes[:filesize % core.PACKET_SIZE]
    file_copy.write(shrinked_data)

Benchmarks et performance

Implémenté tel quel, l’encodage puis décodage d’un fichier de moins de 1MB est instantané. En mode « systématique » avec une redondance de 1.5x, la vitesse de transfert évolue de 300 MB/s à 10 MB/s pour des fichiers allant de 100 MB à quelques GB.

A titre indicatif, voici plusieurs mesures de temps d’encodage puis décodage sur différents fichiers :

Taille (MB) Blocs Symboles Encodage Décodage
Temps (s) Vitesse (MB/s) Temps (s) Vitesse (MB/s)
1 16 24 0.00 0.00
100 1600 2400 0.21 476.1 0.31 322.5
1200 19200 28800 3.86 310.8 39.82 30.1
2000 32000 48000 6.44 310.5 104.10 19.2
3600 57600 86400 23.14 155.5 426.36 8.4
Figure 8: Mesures indicatives du temps total (encodage et décodage) sur un processeur i5 6200U @ 2.30GHz.

Sans surprise, c’est le décodage et sa complexité algorithmique en minimum \mathcal{O}(N^2) qui limite les performances.

Rapidement, on constate que le décodage devient impraticable plus la taille –et donc le nombre de symboles à gérer– augmente. Une solution simple pour outrepasser cette limitation consisterai à sous-découper un fichier en plusieurs parties.

Conclusion

Le but de cet article était d’aborder les codes fontaines avec un exemple, les LT Codes. Nous avons vu comment encoder des symboles à partir de plusieurs blocs de données pouvait permettre de corriger plusieurs erreurs à la fois.

Nous avons également constaté les limitations du décodage itératif, de moins en moins rapide lorsque la taille des données à encoder augmente. Cependant, d’autres algorithmes de décodage plus performants ont depuis été depuis proposés (une simple recherche Google avec le mot clé « LT Codes » montre combien la recherche est active).

D’autres codes correcteurs existent également. Les Raptor Codes sont par exemple une évolution des LT Codes qui permettent un décodage linéaire quelque soit la taille du fichier.

Vous pouvez retrouver l’intégralité de l’implémentation sur Github, tout comme le script de benchmark et ses résultats.

Références ayant aidé à l’écriture de cet article :

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Ce site utilise Akismet pour réduire les indésirables. Apprenez comment les données de vos commentaires sont utilisées.