Secuencia sincrónica en un proceso no determinista

Secuencia sincrónica en un proceso no determinista
Sin comentarios Facebook Twitter Flipboard E-mail
josejuan

Jose Juan

    Un sistema concurrente suele resolver problemas de naturaleza intrínsecamente no determinista, lo que quiere decir que no podemos predecir la secuencia exacta en la que ocurrirán ciertos eventos.
    Por otro lado, existen procesos deterministas que, por su complejidad, nos resultan más cómodos de resolver usando enfoques no deterministas, como la programación guiada por eventos o como cuando para un algoritmo secuencial sólo conocemos otro paralelo en el que desconocemos el orden en que terminarán los subprocesos.
    Así, aunque el título no sea impactante, trataremos de poner un poco de orden en este tipo de problemas disfrutando de la fascinante búsqueda de algoritmos ¿te animas?.

El problema

Por poner un ejemplo de lo que queremos conseguir, piensa en la estructura de directorios. Podemos tener en la raíz los subdirectorios A\, B\, ... y a su vez en cada uno de ellos A\A\ y A\B\, ... y a su vez A\A\A\, A\A\B\, ... siendo las letras tan sólo una forma de ilustrar el ejemplo.

Supón que nos piden procesar todos los archivos con un complejo proceso en el que el tiempo de cómputo varía notablemente entre unos archivos y otros; según se procesan, hay que almacenar el resultado en una unidad de cinta en la que los resultados deben seguir el mismo orden que en el árbol, por ejemplo, supón los siguientes archivos:


A\
  +- A\
  |    +- aa.txt
  |    |
  |    +- ab.txt
  +-B\
  |   +- A\
  |   |    |
  |   |    +- abaa.txt
  |   |    |
  |   |    +- abab.txt
  :   :    :

En tal caso, a la cinta se le deben enviar los datos en el siguiente orden: A\, A\A\, A\A\aa.txt, A\A\ab.txt, A\B\, A\B\A\, A\B\A\abaa.txt, A\B\A\abab.txt, etc...

Lógicamente el algoritmo trivial consiste en procesar uno a uno los archivos e ir escribiéndolos en la cinta pero claro, el proceso de cada archivo es muy complejo (y no paralelizable) y tampoco es plan de desaprovechar nuestra última y flamante máquina con 32 procesadores.

Una posible solución

En realidad, hay montones de soluciones que podemos aplicar a nuestro problema: podemos procesar todos los archivos e ir replicando el árbol de directorios con las soluciones (aunque haríamos muchas más escrituras en disco de las necesarias y tendríamos que leerlas otra vez al pasar a cinta), si tenemos suficiente memoria, podemos crear un diccionario e ir insertando los resultados, cuando aparezca el primero lo enviamos a la cinta, cuando aparezca (o ya estuviera) el segundo lo enviamos también (aunque es poco elegante, porque hace mucho trabajo innecesario), etc...

Es obvio que si paralelizamos el proceso y el primero que hay que enviar a la cinta es el que más tarda de todos, necesitamos tanta memoria intermedia ¡como el total de los nodos!, pero en general, podremos reducir el uso de memoria a la mínima necesaria (en media) para almacenar tantos resultados como sean generados durante el tiempo medio de cómputo de un nodo (que a su vez depende de otros factores como el el nº de procesadores, planificador de tareas [no es lo mismo que se asigne procesador por nodo hasta completar que se inicien todos a la vez y roten], velocidad a la que se genera el dato de respuesta, etc...).

Pasando el testigo

Si nos fijamos en el árbol, cuando estamos en un nodo, éste hará su tarea y mientras se ejecutan sus hijos esperará a que le llegue el turno para poder escribir en la cinta, escribirá en la cinta y pasará el testigo a sus hijos, los hijos se irán pasando el testigo de forma ordenada y luego, lo devolverán al padre. Quizás se vea mejor con una imagen:

Nodo de un árbol de procesos asíncronos con un subflujo síncrono

En el dibujo, el flujo azul son los procesos que lanzamos en paralelo, algunos como pasar por un directorio no consumirán mucho tiempo (en otros procesos podría ser que sí) pero el ejemplo de la cinta es un ejemplo, debemos tratar de generalizar. Dichos procesos se ejecutan todos en paralelo y no sabemos en que orden terminarán. Por contra, el flujo verde es el orden en el que deben enviarse los datos a la cinta y como se ve, no puede hacerse en paralelo (el cálculo sí, que es lo azul, la escritura en cinta no, que es lo verde).

¿Y que son las estrellas verdes?, en realidad, nosotros estamos sincronizando el árbol de procesos en recorrido preorden (primero padre y luego hijos), pero en cada una de esas estrellas podemos poner una "marca de sincronía", es decir, en una, en dos o en las tres. Por otro lado, el hecho de que el árbol sea binario, no le quita generalidad (puede crearse un nodo intermedio para hacerlo terciario, al terciario para hacerlo cuaternario, etc...), incluso es posible cambiar el orden del flujo sincrónico en función de los resultados intermedios (aunque podría hacerse muy complicado y ser preferible un diccionario global, por ejemplo). Una rama completa podría verse así:

Flujo sincrónico en un árbol asíncrono no determinista

Una implementación en C#

El dibujo anterior, nos da la clave para implementar el proceso, hay un estado que debemos propagar por todo el árbol, ese estado es el valor inicial para cada nodo { waitInput, waitOutput } y para los que tenemos que crear estados intermedios (los pasos de testigo). También hay un "paso rápido de testigo" en las hojas del árbol, digamos que se pasan el testigo de la mano izquierda a la derecha. Creo que se ve mejor leyendo directamente el código:

view raw SyncLink.cs hosted with ❤ by GitHub

Como ejemplo de uso, podemos implementar paralelamente el cálculo del archiconocido fibonacci, en el que iremos generando sincronizadamente los parámetros de la función. Así, dada la recurrencia "fib(n) = fib(n - 1) + fib(n - 2)" ejecutándose en paralelo, los hijos de "fib(n - 1)" tardarán más que el propio "fib(n - 2)", aunque éste saldrá después que ellos. La función fibonacci, con el proceso de salida parametrizado, usando nuestra clase anterior sería:

static int fibonacci(SyncLink s, int n, Action<int> action) {
Thread.Sleep(1000); // para ver cómo procesos rápidos esperan a otros lentos
if(n < 2)
return s.End(() => action(n), () => 1);
else
return s.Fork(() => action(n)
, ss => fibonacci(ss, n - 1, action)
, ss => fibonacci(ss, n - 2, action)
, (a, b) => a + b);
}
view raw fibonacci.cs hosted with ❤ by GitHub

Así, si queremos imprimir directamente todos los nodos según se van "rastreando", escribiríamos sin más:

var r =
SyncLink.Run(
s => fibonacci(s, 4, n => Console.WriteLine(n)),
false);
Console.WriteLine("Fibonacci: {0}", r);
/*
4
3
2
1
0
1
Fibonacci: 5
2
1
0
*/

Para los interesados (que sé que los hay ;) esa s es un estado perfecto para meterlo en una mónada (que en el caso de mi código C# no queda oculta en la sintaxis, cuestión interesante de indagar).

¿Y que pasa si lo que queremos es obtener una secuencia como resultado de los subproductos sincrónicos?, en tal caso, nuestra acción consistirá en insertar los elementos en un FIFO, afortunadamente tenemos una ideal para el caso, por lo que podemos reutilizar nuestra función anterior haciendo:

static IEnumerable<int> enumerableFib(int N) {
var q = new BlockingCollection<int>();
new Thread(() => {
SyncLink.Run(s => fibonacci(s, N, n => q.Add(n)));
// recuerda que por defecto Run espera a terminar
q.CompleteAdding();
}).Start();
return q.GetConsumingEnumerable();
}

De este modo, disponemos de nuestra secuencia fibonacci, eficientemente paralelizada (usando todos nuestros procesadores) y a la que accedemos de forma transparente como si de la versión secuencial se tratara, por ejemplo:

foreach(var n in enumerableFib(4))
Console.WriteLine("e ~> {0}", n);
/*
e ~> 4
e ~> 3
e ~> 2
e ~> 1
e ~> 0
e ~> 1
e ~> 2
e ~> 1
e ~> 0
*/

¡Pero bueno ¿y que pasa con nuestra cinta?!

¡Oh!, bueno, voy a clonar un repositorio cualquiera y vamos a listar en riguroso orden cada archivo calculando su hash. La función podría ser así (a mi me resulta cómodo usar clausuras), que, como antes, permite exportar una función aparentemente secuencial pero que paraleliza todo el trabajo, haciendo que tanto el disco como los procesadores estén echando humo:

static IEnumerable<Tuple<string, string>> enumDirectoryHashes(string root) {
var q = new BlockingCollection<Tuple<string, string>>();
Func<SyncLink, IEnumerable<string>, bool> branchs = null;
Func<SyncLink, string, bool> node = (s, path) => {
if(File.Exists(path)) {
var hash = GetMD5HashFromFile(path); // éste se hace también en paralelo
s.End(() => q.Add(Tuple.Create(path, hash)), // sólo la escritura del resultado es sincrónica
() => false);
} else
branchs(s,
Directory.EnumerateFiles(path, "*.*", SearchOption.TopDirectoryOnly).Concat(
Directory.EnumerateDirectories(path, "*.*", SearchOption.TopDirectoryOnly))
.OrderBy(k => k)
);
return false; // podríamos calcular algún total sobre el árbol
};
branchs = (s, paths) => {
if(!paths.Any())
return s.End(() => { }, () => false); // nada que hacer
var head = paths.First();
var tail = paths.Skip(1);
if(!tail.Any())
return node(s, head);
// paralelización n-aria (como decía, basta ir añadiendo nodos
// para convertir un árbol binario en otro n-ario)
return s.Fork(() => { },
ss => node(ss, tail.First()),
ss => branchs(ss, tail.Skip(1)),
(a, b) => false);
};
new Thread(() => {
SyncLink.Run(s => node(s, root));
q.CompleteAdding();
}).Start();
return q.GetConsumingEnumerable();
}

La forma de usarla, es muy sencilla, puesto que obtenemos una enumeración:

foreach(var hashedFile in enumDirectoryHashes(@"d:\tmp\SignalR"))
Console.WriteLine("{0}: {1}", hashedFile.Item2, hashedFile.Item1);
/*
62e6064204c49910fd8b71dca206a4a5: d:\tmp\SignalR\.gitattributes
99914b932bd37a50b983c5e7c90ae93b: d:\tmp\SignalR\build\_Chutzpah.coverage.json
6985c7123edda3e70cc160947e02681c: d:\tmp\SignalR\build\applicationHost.config
07667ca80fc1044c16e711d3ebd18f41: d:\tmp\SignalR\build\Build.tasks
4268e58481e987e5287e6127911a8a6d: d:\tmp\SignalR\build\Microsoft.AspNet.SignalR.OpenCover.targets
907c17a99bf4ddc55e90b5abcffccea7: d:\tmp\SignalR\build\Microsoft.AspNet.SignalR.Projects.Properties.proj
6b04760191f49fe302f9a361f943872a: d:\tmp\SignalR\build\Microsoft.AspNet.SignalR.versions.targets
116bcbec24fd0fbd91ea0bed25247a47: d:\tmp\SignalR\build\Microsoft.Bcl.Build.Tasks.dll
55c55f2d0feabffc78fa2e84b17a8cdf: d:\tmp\SignalR\build\MSBuild.Community.Tasks.Targets
8e610cbd9d327c3db71c7bd92b2fe4d6: d:\tmp\SignalR\build\MSBuild.Community.Tasks.xsd
3497c7e29438aebd12856a89c910ca7d: d:\tmp\SignalR\build\scan_tests.ps1
4543304bafae6e5a04674ec83c505e9b: d:\tmp\SignalR\build.sh
898e123b27d1950783274be37c0efd54: d:\tmp\SignalR\build-ci-perf.cmd
961b419479e342c5d95ca8bd2e4d6341: d:\tmp\SignalR\lib\iOS\Newtonsoft.Json.dll
653198b2d3d26e90a6b6d754f7487453: d:\tmp\SignalR\lib\Newtonsoft.Json.dll
1bcbcc1f6a29d0177d4c55bc779be1c7: d:\tmp\SignalR\makefile
df9243af99b9f4a647272f34788deb1d: d:\tmp\SignalR\Microsoft.AspNet.SignalR.Mono.sln
6b46a00d8d47d29dfb9b3052eb5519c3: d:\tmp\SignalR\nuspecs\Microsoft.AspNet.SignalR.Core.nuspec
598f2b18623dc8eab1c2d84843eb0c4f: d:\tmp\SignalR\nuspecs\Microsoft.AspNet.SignalR.nuspec
a6151cf57041b72c6bdae6903a7c8ba1: d:\tmp\SignalR\nuspecs\Microsoft.AspNet.SignalR.SelfHost.nuspec
566169ea508b7e2851f7503b4729636b: d:\tmp\SignalR\nuspecs\Microsoft.AspNet.SignalR.SqlServer.nuspec
3614e19cf3e113cc67ad1adefedb74bd: d:\tmp\SignalR\nuspecs\Microsoft.AspNet.SignalR.Utils.nuspec
ac90cfbd615d0a81d01021d11c3afa6c: d:\tmp\SignalR\readmes\SelfhostReadme.txt
b650f026d36b20af2618cc3b44b0b0a1: d:\tmp\SignalR\src\Microsoft.AspNet.SignalR.Client\ConnectionExtensions.cs
99d55fdf15184bd62b20427f0d37954e: d:\tmp\SignalR\src\Microsoft.AspNet.SignalR.Client\HeaderDictionary.cs
3a65d00b31909d6175dbffe57cb8f1cc: d:\tmp\SignalR\src\Microsoft.AspNet.SignalR.Client\Http\HttpHelper.cs
(y un larguísimo etcétera)
*/

Nota sobre la implementación aportada

El código mostrado es de juguete, está escrito a vuelapluma y seguramente tendrá fallos y muchas potenciales mejoras (por ejemplo, crear un método ForkEnum con la estrategia usada en el procesamiento de directorios). También hay otras muchas formas de enfocar el problema, aquí (que es la fuente de la que tomé el problema) enfocan la solución usando el soporte a futuribles de F# (a mi me gusta más mi solución de semáforos y de implementar una acción genérica y no una enumeración de forma directa :P).

Nota sobre la programación paralela

En general la programación paralela no es fácil y menos si tenemos que gestionar procesos concurrentes no deterministas. Pero eso no es óbice para que, como programadores no intentemos hacer que nuestros procesos sean eficientes y usen todos los recursos disponibles (en este caso disco y procesador). ¿Os suena que los usuarios se quejan de que los dispositivos (en general) tienen cada vez más núcleos pero no los aprovecha el software?...

Argumentar que el código se complica innecesariamente, facilidad de mantener, etc... para mí no son razones para no producir un resultado mejor. Ésto no quita, para que dichos argumentos sean restricciones en nuestro problema y por tanto una solución "compleja" no sea, en ese caso, la mejor solución. Cada cual en su contexto, juzgará el equilibrio necesario para resolver el problema.

Comentarios cerrados
Inicio
×

Utilizamos cookies de terceros para generar estadísticas de audiencia y mostrar publicidad personalizada analizando tu navegación. Si sigues navegando estarás aceptando su uso. Más información